Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: add all lock strengths and durabilities to TestLockTableConcurrentRequests #111465

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 92 additions & 56 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -722,30 +723,42 @@ func scanSpans(
spanStr := parts[1]
str := GetStrength(t, d, strS)
// Compute latch span access based on the supplied strength.
var sa spanset.SpanAccess
switch str {
case lock.None:
sa = spanset.SpanReadOnly
case lock.Intent:
sa = spanset.SpanReadWrite
case lock.Exclusive:
sa = spanset.SpanReadWrite
case lock.Shared:
// Unlike non-locking reads, shared-locking reads are isolated at all
// timestamps (not just the request's timestamp); so we acquire a read
// latch at max timestamp. See
// https://github.com/cockroachdb/cockroach/issues/102264.
sa = spanset.SpanReadOnly
ts = hlc.MaxTimestamp
default:
d.Fatalf(t, "unsupported lock strength: %s", str)
}
latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), ts)
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, getSpan(t, d, spanStr), latchTs)
lockSpans.Add(str, getSpan(t, d, spanStr))
}
return latchSpans, lockSpans
}

// latchAccessForLockStrength returns the latch access and timestamp to use for
// a given lock strength and request timestamp. It duplicates some of the logic
// in DefaultDeclareIsolatedKeys to avoid the package dependency.
func latchAccessForLockStrength(
str lock.Strength, ts hlc.Timestamp,
) (spanset.SpanAccess, hlc.Timestamp) {
switch str {
case lock.None:
return spanset.SpanReadOnly, ts
case lock.Shared:
// Unlike non-locking reads, shared-locking reads are isolated at all
// timestamps (not just the request's timestamp); so we acquire a read
// latch at max timestamp. See
// https://github.com/cockroachdb/cockroach/issues/102264.
//
// We don't need to duplicate the special case for replicated shared
// locks here (see ReplicatedSharedLocksTransactionLatchingKey) because
// there is no risk in these tests of two shared lock acquisitions from
// the same transaction clobbering each other's state.
return spanset.SpanReadOnly, hlc.MaxTimestamp
case lock.Exclusive:
return spanset.SpanReadWrite, ts
case lock.Intent:
return spanset.SpanReadWrite, ts
default:
panic(fmt.Sprintf("unsupported lock strength: %s", str))
}
}

func ScanIsoLevel(t *testing.T, d *datadriven.TestData) isolation.Level {
const key = "iso"
if !d.HasArg(key) {
Expand Down Expand Up @@ -1060,7 +1073,7 @@ type workItem struct {

// Request.
request *Request
locksToAcquire []roachpb.Key
locksToAcquire []lockToAcquire

// Update locks.
intents []roachpb.LockUpdate
Expand All @@ -1080,6 +1093,16 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
if item.request != nil {
var lg *spanlatch.Guard
var g lockTableGuard
defer func() {
if lg != nil {
e.lm.Release(lg)
lg = nil
}
if g != nil {
e.lt.Dequeue(g)
g = nil
}
}()
var err error
for {
// Since we can't do a select involving latch acquisition and context
Expand All @@ -1099,6 +1122,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
break
}
e.lm.Release(lg)
lg = nil
var lastID uuid.UUID
L:
for {
Expand All @@ -1116,14 +1140,12 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
if !lastID.Equal(uuid.UUID{}) && item.request.Txn != nil {
_, err = e.waitingFor(item.request.Txn.ID, lastID, uuid.UUID{})
if err != nil {
e.lt.Dequeue(g)
return err
}
}
break L
case waitSelf:
if item.request.Txn == nil {
e.lt.Dequeue(g)
return errors.Errorf("non-transactional request cannot waitSelf")
}
case waitForDistinguished, waitFor, waitElsewhere:
Expand All @@ -1134,7 +1156,6 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
lastID = state.txn.ID
}
if aborted {
e.lt.Dequeue(g)
return err
}
}
Expand All @@ -1144,15 +1165,13 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
}
}

// acquire locks.
for _, k := range item.locksToAcquire {
err = e.acquireLock(item.request.Txn, k)
// Acquire locks.
for _, toAcq := range item.locksToAcquire {
err = e.acquireLock(item.request.Txn, toAcq)
if err != nil {
break
}
}
e.lt.Dequeue(g)
e.lm.Release(lg)
return err
}
for i := range item.intents {
Expand All @@ -1169,12 +1188,19 @@ type workloadItem struct {
// Request to be executed, iff request != nil
request *Request
// locks to be acquired by the request.
locksToAcquire []roachpb.Key
locksToAcquire []lockToAcquire

// Non-empty when transaction should release locks.
finish uuid.UUID
}

// lockToAcquire is a lock that should be acquired by a request.
type lockToAcquire struct {
key roachpb.Key
str lock.Strength
dur lock.Durability
}

// state of a transaction maintained by workloadExecutor, for deadlock
// detection, and deciding when transaction can be finished (when a
// workloadItem has instructed that it be finished and all its ongoing
Expand Down Expand Up @@ -1245,8 +1271,8 @@ func newWorkLoadExecutor(items []workloadItem, concurrency int) *workloadExecuto
}
}

func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key) error {
acq := roachpb.MakeLockAcquisition(txn, k, lock.Unreplicated, lock.Exclusive)
func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, toAcq lockToAcquire) error {
acq := roachpb.MakeLockAcquisition(txn, toAcq.key, toAcq.dur, toAcq.str)
err := e.lt.AcquireLock(&acq)
if err != nil {
return err
Expand All @@ -1257,7 +1283,7 @@ func (e *workloadExecutor) acquireLock(txn *roachpb.Transaction, k roachpb.Key)
if !ok {
return errors.Errorf("testbug: lock acquiring request with txnID %v has no transaction", txn.ID)
}
tstate.acquiredLocks = append(tstate.acquiredLocks, k)
tstate.acquiredLocks = append(tstate.acquiredLocks, toAcq.key)
return nil
}

Expand Down Expand Up @@ -1458,6 +1484,7 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
for i := 0; i < 10; i++ {
keys = append(keys, roachpb.Key(string(rune('a'+i))))
}
strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

const numKeys = 2
Expand All @@ -1472,12 +1499,9 @@ func TestLockTableConcurrentSingleRequests(t *testing.T) {
lockSpans := &lockspanset.LockSpanSet{}
for i := 0; i < numKeys; i++ {
span := roachpb.Span{Key: keys[keysPerm[i]]}
acc := spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
str := lock.None
if acc == spanset.SpanReadWrite {
str = lock.Intent
}
latchSpans.AddMVCC(acc, span, ts)
str := strs[rand.Intn(len(strs))]
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
}
var txn *roachpb.Transaction
Expand Down Expand Up @@ -1537,6 +1561,9 @@ func TestLockTableConcurrentRequests(t *testing.T) {
for i := 0; i < 10; i++ {
keys = append(keys, roachpb.Key(string(rune('a'+i))))
}
// TODO(nvanbenschoten): add lock.Shared back in once #111144 is fixed.
//strs := []lock.Strength{lock.None, lock.Shared, lock.Exclusive, lock.Intent}
strs := []lock.Strength{lock.None, lock.Exclusive, lock.Intent}
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))
const numActiveTxns = 8
var activeTxns [numActiveTxns]*enginepb.TxnMeta
Expand Down Expand Up @@ -1585,27 +1612,33 @@ func TestLockTableConcurrentRequests(t *testing.T) {
wi := workloadItem{request: request}
for i := 0; i < numKeys; i++ {
span := roachpb.Span{Key: keys[keysPerm[i]]}
acc := spanset.SpanReadOnly

str := lock.None
dupRead := false
if !onlyReads {
acc = spanset.SpanAccess(rng.Intn(int(spanset.NumSpanAccess)))
if acc == spanset.SpanReadWrite && txnMeta != nil && rng.Intn(2) == 0 {
// Acquire lock.
wi.locksToAcquire = append(wi.locksToAcquire, span.Key)
str = lock.Intent
}
if acc == spanset.SpanReadWrite && rng.Intn(2) == 0 {
// Also include the key as read.
dupRead = true
str = lock.Intent
}
if !onlyReads && txnMeta != nil {
// Randomly select a lock strength (including lock.None).
str = strs[rand.Intn(len(strs))]
}
latchSpans.AddMVCC(acc, span, ts)
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
if str != lock.None {
// Randomly select a lock durability. Shared and Exclusive locks
// can be unreplicated, but Intent can not.
dur := lock.Replicated
if str != lock.Intent && rng.Intn(2) == 0 {
dur = lock.Unreplicated
}
toAcq := lockToAcquire{span.Key, str, dur}
wi.locksToAcquire = append(wi.locksToAcquire, toAcq)
}

dupRead := str != lock.None && rng.Intn(2) == 0
if dupRead {
latchSpans.AddMVCC(spanset.SpanReadOnly, span, ts)
lockSpans.Add(lock.None, span)
// Also include the key as a non-locking read.
str = lock.None
sa, latchTs := latchAccessForLockStrength(str, ts)
latchSpans.AddMVCC(sa, span, latchTs)
lockSpans.Add(str, span)
}
}
items = append(items, wi)
Expand All @@ -1620,7 +1653,10 @@ func TestLockTableConcurrentRequests(t *testing.T) {
t.Run(fmt.Sprintf("concurrency %d", c), func(t *testing.T) {
exec := newWorkLoadExecutor(items, c)
if err := exec.execute(false, 200); err != nil {
t.Fatal(err)
// TODO(nvanbenschoten): remove this once #110435 is fixed.
if !testutils.IsError(err, "lock promotion from Shared to .* is not allowed") {
t.Fatal(err)
}
}
})
}
Expand Down