Skip to content

Commit

Permalink
concurrency: datastructure changes to enable multiple lock strengths
Browse files Browse the repository at this point in the history
Prior to this patch, any unreplicated lock was assumed to be held with
lock strength Exclusive. This patch enables us to represent other lock
strengths in the lock table, which will allow us to support shared locks
in the near future.

There's a small functional change in this commit. Previously, we'd track
all sequence numbers an unreplicated lock is held at in the lock table.
As described in the shared locks RFC, this tracking is superflous --
instead, it's sufficient to only track the lowest sequence number (that
hasn't been rolled back) with which a lock is held. We do so now.

Closes cockroachdb#102270

Epic: none

Release note: None
  • Loading branch information
arulajmani committed Jul 25, 2023
1 parent 8815db3 commit f56e347
Show file tree
Hide file tree
Showing 34 changed files with 619 additions and 565 deletions.
200 changes: 127 additions & 73 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,37 +923,105 @@ type queuedGuard struct {

// Information about a lock holder for unreplicated locks.
type unreplicatedLockHolderInfo struct {
// Lock strength is always lock.Exclusive.

// All the TxnSeqs in the current epoch at which this lock has been acquired,
// in increasing order. We track these so that if a lock is acquired at both
// seq 5 and seq 7, rollback of 7 does not cause the lock to be released. This
// consistent with PostgreSQL semantics; see:
// https://www.postgresql.org/docs/12/sql-select.html#SQL-FOR-UPDATE-SHARE
seqs []enginepb.TxnSeq
// infos tracks whether the lock is held with a particular strength; if it is,
// the lowest sequence number (that hasn't been rolled back) that it was
// acquired with is stored. If the lock isn't held with a particular strength,
// a sentinel value (-1) is stored.
//
// NB: Intents cannot be held/acquired in unreplicated fashion; thus the
// highest lock strength for unreplicated locks is Exclusive.
infos [lock.Exclusive + 1]enginepb.TxnSeq

// The timestamp at which the unreplicated lock is held. Must not regress.
ts hlc.Timestamp
}

// init initializes an unreplicatedLockHolderInfo struct.
func (ulh *unreplicatedLockHolderInfo) init() {
for str := lock.None; str <= lock.Exclusive; str++ {
ulh.infos[str] = -1
}
}

// clear removes previously tracked unreplicated lock holder information.
func (ulh *unreplicatedLockHolderInfo) clear() {
ulh.seqs = nil
for str := lock.None; str <= lock.Exclusive; str++ {
ulh.infos[str] = -1
}
ulh.ts = hlc.Timestamp{}
}

// epochBumped is called when a transaction is known to have its epoch bumped.
// State specific to the previous epoch is cleared out.
func (ulh *unreplicatedLockHolderInfo) epochBumped() {
for str := lock.None; str <= lock.Exclusive; str++ {
ulh.infos[str] = -1
}
}

// acquire updates tracking on the receiver, if necessary[1], to denote the lock
// is held with the supplied lock strength and sequence number.
//
// [1] We only track the lowest (non-rolled back) sequence number with which a
// lock is held, as doing so is sufficient.
func (ulh *unreplicatedLockHolderInfo) acquire(str lock.Strength, seqNum enginepb.TxnSeq) {
if !ulh.held(str) ||
// ... OR we've been made aware of an acquisition at a lower sequence number
ulh.infos[str] > seqNum {
ulh.infos[str] = seqNum
}
}

// held returns true if the receiver is held with the supplied lock strength.
func (ulh *unreplicatedLockHolderInfo) held(str lock.Strength) bool {
return ulh.infos[str] != -1
}

// rollbackIgnoredSeqNumbers mutates the receiver to rollback any locks that are
// known to be held at sequence numbers that are known to be rolled back.
func (ulh *unreplicatedLockHolderInfo) rollbackIgnoredSeqNumbers(
ignoredSeqNums []enginepb.IgnoredSeqNumRange,
) {
if len(ignoredSeqNums) == 0 {
return
}
for str := lock.None; str <= lock.Exclusive; str++ {
if ulh.infos[str] == -1 {
continue
}
i := sort.Search(len(ignoredSeqNums), func(i int) bool { return ignoredSeqNums[i].End >= ulh.infos[str] })
shouldIgnore := i != len(ignoredSeqNums) && ulh.infos[str] >= ignoredSeqNums[i].Start
if shouldIgnore {
ulh.infos[str] = -1
}
}
}

func (ulh *unreplicatedLockHolderInfo) isEmpty() bool {
return ulh.seqs == nil && ulh.ts.IsEmpty()
for str := lock.None; str <= lock.Exclusive; str++ {
if ulh.held(str) { // lock is held
return false
}
}
assert(ulh.ts.IsEmpty(), "lock not held, timestamp should be empty")
return true
}

func (ulh *unreplicatedLockHolderInfo) safeFormat(sb *redact.StringBuilder) {
if ulh.isEmpty() {
return
}
sb.SafeString("unrepl ")
sb.Printf("seqs: [%d", redact.Safe(ulh.seqs[0]))
for j := 1; j < len(ulh.seqs); j++ {
sb.Printf(", %d", redact.Safe(ulh.seqs[j]))
sb.SafeString("unrepl [")
first := true
for str := lock.None; str <= lock.Exclusive; str++ {
if !ulh.held(str) {
continue
}
if !first {
sb.Printf(", ")
}
first = false
sb.Printf("(str: %s seq: %d)", redact.Safe(str), redact.Safe(ulh.infos[str]))
}
sb.SafeString("]")
}
Expand Down Expand Up @@ -1728,7 +1796,24 @@ func (l *lockState) getLockMode() lock.Mode {
if l.isHeldReplicated() {
return lock.MakeModeIntent(lockHolderTS)
}
return lock.MakeModeExclusive(lockHolderTS, lockHolderTxn.IsoLevel)

// Iterate from the strongest to weakest lock strength.
for str := lock.Exclusive; str >= lock.None; str-- {
if !l.holder.unreplicatedInfo.held(str) {
continue
}
switch str {
case lock.Exclusive:
return lock.MakeModeExclusive(lockHolderTS, lockHolderTxn.IsoLevel)
case lock.Update:
panic(fmt.Sprintf("unhandled lock strength %s", lock.Update))
case lock.Shared:
panic(fmt.Sprintf("unhandled lock strength %s", lock.Shared))
case lock.None:
panic(fmt.Sprintf("unhandled lock strength %s", lock.None))
}
}
panic("unreachable")
}

// Removes the current lock holder from the lock.
Expand Down Expand Up @@ -2294,22 +2379,17 @@ func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock)
if acq.Durability == lock.Unreplicated && l.isHeldUnreplicated() {
switch {
case l.holder.txn.Epoch < acq.Txn.Epoch: // at a higher epoch
// Clear sequence numbers from the older epoch.
l.holder.unreplicatedInfo.seqs = l.holder.unreplicatedInfo.seqs[:0]
// Clear sequence number tracking from the older epoch.
l.holder.unreplicatedInfo.epochBumped()
case l.holder.txn.Epoch == acq.Txn.Epoch: // at the same epoch
// Prune the list of sequence numbers tracked for this lock by removing
// any sequence numbers that are considered ignored by virtue of a
// savepoint rollback.
//
// Note that the in-memory lock table is the source of truth for just
// unreplicated locks, so we only do this pruning for unreplicated lock
// acquisition. On the other hand, for replicated locks, the source of
// truth is what's written in MVCC. We could try and mimic that logic
// here, but we choose not to, as doing so is error-prone/difficult to
// maintain.
l.holder.unreplicatedInfo.seqs = removeIgnored(
l.holder.unreplicatedInfo.seqs, acq.IgnoredSeqNums,
)
// unreplicated locks, and as such, sequence numbers are only tracked
// for them.
l.holder.unreplicatedInfo.rollbackIgnoredSeqNumbers(acq.IgnoredSeqNums)
case l.holder.txn.Epoch > acq.Txn.Epoch: // at a prior epoch
// Reject the request; the logic here parallels how mvccPutInternal
// handles this case for intents.
Expand Down Expand Up @@ -2365,7 +2445,7 @@ func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock)
switch acq.Durability {
case lock.Unreplicated:
l.holder.unreplicatedInfo.ts.Forward(acq.Txn.WriteTimestamp)
l.holder.unreplicatedInfo.seqs = append(l.holder.unreplicatedInfo.seqs, acq.Txn.Sequence)
l.holder.unreplicatedInfo.acquire(acq.Strength, acq.Txn.Sequence)
case lock.Replicated:
l.holder.replicatedInfo.ts.Forward(acq.Txn.WriteTimestamp)
default:
Expand Down Expand Up @@ -2453,7 +2533,7 @@ func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock)
switch acq.Durability {
case lock.Unreplicated:
l.holder.unreplicatedInfo.ts = acq.Txn.WriteTimestamp
l.holder.unreplicatedInfo.seqs = append([]enginepb.TxnSeq(nil), acq.Txn.Sequence)
l.holder.unreplicatedInfo.acquire(acq.Strength, acq.Txn.Sequence)
case lock.Replicated:
l.holder.replicatedInfo.ts = acq.Txn.WriteTimestamp
default:
Expand All @@ -2475,35 +2555,17 @@ func (l *lockState) isIdempotentLockAcquisition(acq *roachpb.LockAcquisition) bo
assert(txn.ID == acq.Txn.ID, "existing lock transaction is different from the acquisition")
switch acq.Durability {
case lock.Unreplicated:
seqs := l.holder.unreplicatedInfo.seqs
// Cheaply check if this could be an idempotent lock acquisition.
if len(seqs) > 0 && seqs[len(seqs)-1] >= acq.Txn.Sequence {
// Idempotent lock acquisition. In this case, we simply ignore the lock
// acquisition as long as it corresponds to an existing sequence number.
// If the sequence number is not being tracked yet, insert it into the
// sequence history. The validity of such a lock re-acquisition should
// have already been determined at the MVCC level.

if i := sort.Search(len(seqs), func(i int) bool {
return seqs[i] >= acq.Txn.Sequence
}); i == len(seqs) {
panic("lockTable bug - search value <= last element")
} else if seqs[i] != acq.Txn.Sequence {
// TODO(arul): Once we change the lockState datastructure to only track
// the highest sequence number, we should remove all mutations happening
// inside this function.
seqs = append(seqs, 0)
copy(seqs[i+1:], seqs[i:])
seqs[i] = acq.Txn.Sequence
l.holder.unreplicatedInfo.seqs = seqs
}
if !l.holder.unreplicatedInfo.held(acq.Strength) { // unheld lock
return false
}
// Lock is being re-acquired at a higher sequence number when it's already
// held at a lower sequence number.
return l.holder.unreplicatedInfo.infos[acq.Strength] <= acq.Txn.Sequence &&
// NB: Lock re-acquisitions at different timestamps are not considered
// idempotent. Strictly speaking, we could tighten this condition to
// consider lock re-acquisition at lower timestamps idempotent, as a
// lock's timestamp at a given durability never regresses.
return l.holder.unreplicatedInfo.ts.Equal(acq.Txn.WriteTimestamp)
}
return false
l.holder.unreplicatedInfo.ts.Equal(acq.Txn.WriteTimestamp)
case lock.Replicated:
// NB: Lock re-acquisitions at different timestamps are not considered
// idempotent. Strictly speaking, we could tighten this condition to
Expand Down Expand Up @@ -2687,25 +2749,6 @@ func (l *lockState) tryClearLock(force bool) bool {
return true
}

// Removes the TxnSeqs in heldSeqNums that are contained in ignoredSeqNums.
// REQUIRES: ignoredSeqNums contains non-overlapping ranges and sorted in
// increasing seq order.
func removeIgnored(
heldSeqNums []enginepb.TxnSeq, ignoredSeqNums []enginepb.IgnoredSeqNumRange,
) []enginepb.TxnSeq {
if len(ignoredSeqNums) == 0 {
return heldSeqNums
}
held := heldSeqNums[:0]
for _, n := range heldSeqNums {
i := sort.Search(len(ignoredSeqNums), func(i int) bool { return ignoredSeqNums[i].End >= n })
if i == len(ignoredSeqNums) || ignoredSeqNums[i].Start > n {
held = append(held, n)
}
}
return held
}

// Tries to update the lock: noop if this lock is held by a different
// transaction, else the lock is updated. Returns whether the lockState can be
// garbage collected, and whether it was held by the txn.
Expand Down Expand Up @@ -2762,8 +2805,17 @@ func (l *lockState) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo

// ...update corresponds to the current epoch.
case txn.Epoch == l.holder.txn.Epoch:
l.holder.unreplicatedInfo.seqs = removeIgnored(l.holder.unreplicatedInfo.seqs, up.IgnoredSeqNums)
if len(l.holder.unreplicatedInfo.seqs) == 0 {
l.holder.unreplicatedInfo.rollbackIgnoredSeqNumbers(up.IgnoredSeqNums)
// Check if the lock is still held after rolling back ignored sequence
// numbers.
held := false
for str := lock.None; str <= lock.Exclusive; str++ {
if l.holder.unreplicatedInfo.held(str) {
held = true
break
}
}
if !held {
l.holder.unreplicatedInfo.clear()
isLocked = false
break
Expand Down Expand Up @@ -3282,6 +3334,7 @@ func (t *lockTableImpl) AddDiscoveredLock(
l = &lockState{id: lockSeqNum, key: key}
l.queuedWriters.Init()
l.waitingReaders.Init()
l.holder.unreplicatedInfo.init()
t.locks.Set(l)
atomic.AddInt64(&t.locks.numLocks, 1)
} else {
Expand Down Expand Up @@ -3347,6 +3400,7 @@ func (t *lockTableImpl) AcquireLock(acq *roachpb.LockAcquisition) error {
l = &lockState{id: lockSeqNum, key: acq.Key}
l.queuedWriters.Init()
l.waitingReaders.Init()
l.holder.unreplicatedInfo.init()
t.locks.Set(l)
atomic.AddInt64(&t.locks.numLocks, 1)
} else {
Expand Down
12 changes: 6 additions & 6 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,15 +1886,15 @@ func TestLockStateSafeFormat(t *testing.T) {
}
l.holder.txn = &enginepb.TxnMeta{ID: uuid.NamespaceDNS}
// TODO(arul): add something about replicated locks here too.
l.holder.unreplicatedInfo = unreplicatedLockHolderInfo{
ts: hlc.Timestamp{WallTime: 123, Logical: 7},
seqs: []enginepb.TxnSeq{1},
}
l.holder.unreplicatedInfo.init()
l.holder.unreplicatedInfo.ts = hlc.Timestamp{WallTime: 123, Logical: 7}
l.holder.unreplicatedInfo.infos[lock.Exclusive] = 1
l.holder.unreplicatedInfo.infos[lock.Shared] = 3
require.EqualValues(t,
" lock: ‹\"KEY\"\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl seqs: [1]\n",
" lock: ‹\"KEY\"\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl [(str: Shared seq: 3), (str: Exclusive seq: 1)]\n",
redact.Sprint(l))
require.EqualValues(t,
" lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl seqs: [1]\n",
" lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl [(str: Shared seq: 3), (str: Exclusive seq: 1)]\n",
redact.Sprint(l).Redact())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ debug-lock-table
----
num=1
lock: "k"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 12.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 12.000000000,1, info: unrepl [(str: Exclusive seq: 0)]

finish req=req2
----
Expand All @@ -68,7 +68,7 @@ debug-lock-table
----
num=1
lock: "k"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 12.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 12.000000000,1, info: unrepl [(str: Exclusive seq: 0)]

reset
----
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,9 +342,9 @@ num=4
lock: "b"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: repl
lock: "g"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
lock: "h"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]

sequence req=req1
----
Expand All @@ -368,9 +368,9 @@ num=4
lock: "b"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: repl
lock: "g"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
lock: "h"
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000002-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]

debug-advance-clock ts=123
----
Expand Down Expand Up @@ -531,9 +531,9 @@ debug-lock-table
----
num=5
lock: "a"
holder: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
lock: "b"
holder: txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
lock: "c"
holder: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: repl
queued writers:
Expand Down Expand Up @@ -568,12 +568,12 @@ debug-lock-table
----
num=5
lock: "a"
holder: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
waiting readers:
req: 8, txn: 00000002-0000-0000-0000-000000000000
distinguished req: 8
lock: "b"
holder: txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
lock: "c"
holder: txn: 00000003-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: repl
queued writers:
Expand Down Expand Up @@ -613,7 +613,7 @@ debug-lock-table
----
num=4
lock: "b"
holder: txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl seqs: [0]
holder: txn: 00000004-0000-0000-0000-000000000000 epoch: 0, iso: Serializable, ts: 10.000000000,1, info: unrepl [(str: Exclusive seq: 0)]
waiting readers:
req: 8, txn: 00000002-0000-0000-0000-000000000000
distinguished req: 8
Expand Down
Loading

0 comments on commit f56e347

Please sign in to comment.