Skip to content

Commit

Permalink
concurrency: datastructure changes to enable multiple lock strengths
Browse files Browse the repository at this point in the history
Prior to this patch, any unreplicated lock was assumed to be held with
lock strength Exclusive. This patch enables us to represent other lock
strengths in the lock table, which will allow us to support shared locks
in the near future.

Closes #102270

Release note: None
  • Loading branch information
arulajmani committed Jul 28, 2023
1 parent 5b3dd52 commit f1d6a25
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 34 deletions.
138 changes: 107 additions & 31 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -923,17 +923,44 @@ type queuedGuard struct {

// Information about a lock holder for unreplicated locks.
type unreplicatedLockHolderInfo struct {
// Strength is always lock.Exclusive.

// The lowest sequence number (that hasn't been rolled back) that the
// Exclusive lock was acquired with. If the lock isn't held, a sentinel value
// (-1) is stored.
seq enginepb.TxnSeq
// strengths tracks whether the lock is held with a particular strength; if it
// is, the lowest sequence number (that hasn't been rolled back) that it was
// acquired with is stored. If the lock isn't held with a particular strength,
// a sentinel value (-1) is stored.
//
// NB: Intents cannot be held/acquired in unreplicated fashion; thus the
// highest lock strength for unreplicated locks is Exclusive.
strengths [len(unreplicatedHolderStrengths)]enginepb.TxnSeq

// The timestamp at which the unreplicated lock is held. Must not regress.
ts hlc.Timestamp
}

// Fixed length slice for all supported lock strengths for unreplicated locks.
// May be used to iterate supported lock strengths in strength order (strongest
// to weakest).
var unreplicatedHolderStrengths = [...]lock.Strength{lock.Exclusive, lock.Shared}

// unreplicatedLockHolderStrengthToIndexMap returns a mapping between
// (strength, index) pairs that can be used to index into the
// unreplicatedLockHolderInfo.strengths array.
//
// Trying to use a lock strength that isn't supported with unreplicated locks to
// index into the unreplicatedLockHolderInfo.strengths array will cause a
// runtime error.
var unreplicatedLockHolderStrengthToIndexMap = func() [lock.MaxStrength + 1]int {
var m [lock.MaxStrength + 1]int
// Initialize all to -1.
for str := range m {
m[str] = -1
}
// Set the indices of the valid strengths.
for i, str := range unreplicatedHolderStrengths {
m[str] = i
}
return m
}()

// init initializes an unreplicatedLockHolderInfo struct.
func (ulh *unreplicatedLockHolderInfo) init() {
ulh.resetStrengths()
Expand All @@ -953,33 +980,42 @@ func (ulh *unreplicatedLockHolderInfo) epochBumped() {
}

func (ulh *unreplicatedLockHolderInfo) resetStrengths() {
ulh.seq = -1
for strIdx := range ulh.strengths {
ulh.strengths[strIdx] = -1
}
}

// minSeqNumber returns the minimum sequence number the lock is held with given
// the supplied lock strength. -1 is returned if the lock is not held with the
// supplied lock strength.
func (ulh *unreplicatedLockHolderInfo) minSeqNumber(str lock.Strength) enginepb.TxnSeq {
return ulh.strengths[unreplicatedLockHolderStrengthToIndexMap[str]]
}

// acquire updates tracking on the receiver, if necessary[1], to denote the lock
// is held with Exclusive lock strength at the supplied sequence number.
// is held with the supplied lock strength at the supplied sequence number.
//
// [1] We only track the lowest (non-rolled back) sequence number with which a
// lock is held, as doing so is sufficient.
func (ulh *unreplicatedLockHolderInfo) acquire(seqNum enginepb.TxnSeq) error {
if ulh.held() && seqNum < ulh.seq {
func (ulh *unreplicatedLockHolderInfo) acquire(str lock.Strength, seqNum enginepb.TxnSeq) error {
if ulh.held(str) && seqNum < ulh.minSeqNumber(str) {
// If the lock is already held at the given strength, with a given sequence
// number, that sequence number is not allowed to regress. This invariant
// is relied upon by how savepoint rollbacks work, where the lock table must
// learn
return errors.Newf(
"cannot acquire lock with strength %s at seq number %d, already tracked at higher seq number %d",
lock.Exclusive, seqNum, ulh.seq)
str, seqNum, ulh.minSeqNumber(str))
}
if !ulh.held() {
ulh.seq = seqNum
if !ulh.held(str) {
ulh.strengths[unreplicatedLockHolderStrengthToIndexMap[str]] = seqNum
}
return nil
}

// held returns true if the receiver is held with the supplied lock strength.
func (ulh *unreplicatedLockHolderInfo) held() bool {
return ulh.seq != -1
func (ulh *unreplicatedLockHolderInfo) held(str lock.Strength) bool {
return ulh.minSeqNumber(str) != -1
}

// rollbackIgnoredSeqNumbers mutates the receiver to rollback any locks that are
Expand All @@ -990,19 +1026,25 @@ func (ulh *unreplicatedLockHolderInfo) rollbackIgnoredSeqNumbers(
if len(ignoredSeqNums) == 0 {
return
}
if ulh.seq == -1 {
return
}
i := sort.Search(len(ignoredSeqNums), func(i int) bool { return ignoredSeqNums[i].End >= ulh.seq })
shouldIgnore := i != len(ignoredSeqNums) && ulh.seq >= ignoredSeqNums[i].Start
if shouldIgnore {
ulh.seq = -1
for strIdx, minSeqNumber := range ulh.strengths {
if minSeqNumber == -1 {
continue
}
i := sort.Search(len(ignoredSeqNums), func(i int) bool {
return ignoredSeqNums[i].End >= minSeqNumber
})
shouldIgnore := i != len(ignoredSeqNums) && minSeqNumber >= ignoredSeqNums[i].Start
if shouldIgnore {
ulh.strengths[strIdx] = -1
}
}
}

func (ulh *unreplicatedLockHolderInfo) isEmpty() bool {
if ulh.held() { // lock is held
return false
for _, str := range unreplicatedHolderStrengths {
if ulh.held(str) { // lock is held
return false
}
}
assert(ulh.ts.IsEmpty(), "lock not held, timestamp should be empty")
return true
Expand All @@ -1013,7 +1055,21 @@ func (ulh *unreplicatedLockHolderInfo) safeFormat(sb *redact.StringBuilder) {
return
}
sb.SafeString("unrepl [")
sb.Printf("(str: %s seq: %d)", redact.Safe(lock.Exclusive), redact.Safe(ulh.seq))
first := true
for _, str := range unreplicatedHolderStrengths {
if !ulh.held(str) {
continue
}
if !first {
sb.Printf(", ")
}
first = false
sb.Printf(
"(str: %s seq: %d)",
redact.Safe(str),
redact.Safe(ulh.minSeqNumber(str)),
)
}
sb.SafeString("]")
}

Expand Down Expand Up @@ -1792,7 +1848,20 @@ func (l *lockState) getLockMode() lock.Mode {
if l.isHeldReplicated() {
return lock.MakeModeIntent(lockHolderTS)
}
return lock.MakeModeExclusive(lockHolderTS, lockHolderTxn.IsoLevel)
for _, str := range unreplicatedHolderStrengths {
if !l.holder.unreplicatedInfo.held(str) {
continue
}
switch str {
case lock.Exclusive:
return lock.MakeModeExclusive(lockHolderTS, lockHolderTxn.IsoLevel)
case lock.Shared:
panic(fmt.Sprintf("unexpected lock strength %s", str))
default:
panic(fmt.Sprintf("unexpected lock strength %s", str))
}
}
panic("unreachable")
}

// Removes the current lock holder from the lock.
Expand Down Expand Up @@ -2423,7 +2492,7 @@ func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock)
switch acq.Durability {
case lock.Unreplicated:
l.holder.unreplicatedInfo.ts.Forward(acq.Txn.WriteTimestamp)
if err := l.holder.unreplicatedInfo.acquire(acq.Txn.Sequence); err != nil {
if err := l.holder.unreplicatedInfo.acquire(acq.Strength, acq.Txn.Sequence); err != nil {
return err
}
case lock.Replicated:
Expand Down Expand Up @@ -2513,7 +2582,7 @@ func (l *lockState) acquireLock(acq *roachpb.LockAcquisition, clock *hlc.Clock)
switch acq.Durability {
case lock.Unreplicated:
l.holder.unreplicatedInfo.ts = acq.Txn.WriteTimestamp
if err := l.holder.unreplicatedInfo.acquire(acq.Txn.Sequence); err != nil {
if err := l.holder.unreplicatedInfo.acquire(acq.Strength, acq.Txn.Sequence); err != nil {
return err
}
case lock.Replicated:
Expand All @@ -2537,12 +2606,12 @@ func (l *lockState) isIdempotentLockAcquisition(acq *roachpb.LockAcquisition) bo
assert(txn.ID == acq.Txn.ID, "existing lock transaction is different from the acquisition")
switch acq.Durability {
case lock.Unreplicated:
if !l.holder.unreplicatedInfo.held() { // unheld lock
if !l.holder.unreplicatedInfo.held(acq.Strength) { // unheld lock
return false
}
// Lock is being re-acquired at a higher sequence number when it's already
// held at a lower sequence number.
return l.holder.unreplicatedInfo.seq <= acq.Txn.Sequence &&
return l.holder.unreplicatedInfo.minSeqNumber(acq.Strength) <= acq.Txn.Sequence &&
// NB: Lock re-acquisitions at different timestamps are not considered
// idempotent. Strictly speaking, we could tighten this condition to
// consider lock re-acquisition at lower timestamps idempotent, as a
Expand Down Expand Up @@ -2790,7 +2859,14 @@ func (l *lockState) tryUpdateLockLocked(up roachpb.LockUpdate) (heldByTxn, gc bo
l.holder.unreplicatedInfo.rollbackIgnoredSeqNumbers(up.IgnoredSeqNums)
// Check if the lock is still held after rolling back ignored sequence
// numbers.
if !l.holder.unreplicatedInfo.held() {
held := false
for _, str := range unreplicatedHolderStrengths {
if l.holder.unreplicatedInfo.held(str) {
held = true
break
}
}
if !held {
l.holder.unreplicatedInfo.clear()
isLocked = false
break
Expand Down
7 changes: 4 additions & 3 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1888,12 +1888,13 @@ func TestLockStateSafeFormat(t *testing.T) {
// TODO(arul): add something about replicated locks here too.
l.holder.unreplicatedInfo.init()
l.holder.unreplicatedInfo.ts = hlc.Timestamp{WallTime: 123, Logical: 7}
l.holder.unreplicatedInfo.seq = 1
require.NoError(t, l.holder.unreplicatedInfo.acquire(lock.Exclusive, 1))
require.NoError(t, l.holder.unreplicatedInfo.acquire(lock.Shared, 3))
require.EqualValues(t,
" lock: ‹\"KEY\"\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl [(str: Exclusive seq: 1)]\n",
" lock: ‹\"KEY\"\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl [(str: Exclusive seq: 1), (str: Shared seq: 3)]\n",
redact.Sprint(l))
require.EqualValues(t,
" lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl [(str: Exclusive seq: 1)]\n",
" lock: ‹×›\n holder: txn: 6ba7b810-9dad-11d1-80b4-00c04fd430c8 epoch: 0, iso: Serializable, ts: 0.000000123,7, info: unrepl [(str: Exclusive seq: 1), (str: Shared seq: 3)]\n",
redact.Sprint(l).Redact())
}

Expand Down

0 comments on commit f1d6a25

Please sign in to comment.