-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
lock_table_key_scanner.go
336 lines (310 loc) · 11.1 KB
/
lock_table_key_scanner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package storage
import (
"context"
"sync"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
// Fixed length slice for all supported lock strengths for replicated locks. May
// be used to iterate supported lock strengths in strength order (strongest to
// weakest).
var replicatedLockStrengths = [...]lock.Strength{lock.Intent, lock.Exclusive, lock.Shared}
func init() {
if replicatedLockStrengths[0] != lock.MaxStrength {
panic("replicatedLockStrengths[0] != lock.MaxStrength; update replicatedLockStrengths?")
}
}
// replicatedLockStrengthToIndexMap returns a mapping between (strength, index)
// pairs that can be used to index into the lockTableScanner.ownLocks array.
//
// Trying to use a lock strength that isn't supported with replicated locks to
// index into the lockTableScanner.ownLocks array will cause a runtime error.
var replicatedLockStrengthToIndexMap = func() (m [lock.MaxStrength + 1]int) {
// Initialize all to -1.
for str := range m {
m[str] = -1
}
// Set the indices of the valid strengths.
for i, str := range replicatedLockStrengths {
m[str] = i
}
return m
}()
// strongerOrEqualStrengths returns all supported lock strengths for replicated
// locks that are as strong or stronger than the provided strength. The returned
// slice is ordered from strongest to weakest.
func strongerOrEqualStrengths(str lock.Strength) []lock.Strength {
return replicatedLockStrengths[:replicatedLockStrengthToIndexMap[str]+1]
}
// minConflictLockStrength returns the minimum lock strength that conflicts with
// the provided lock strength.
func minConflictLockStrength(str lock.Strength) (lock.Strength, error) {
switch str {
case lock.None:
// Don't conflict with any locks held by other transactions.
return lock.None, nil
case lock.Shared:
return lock.Exclusive, nil
case lock.Exclusive, lock.Intent:
return lock.Shared, nil
default:
return 0, errors.AssertionFailedf(
"lockTableKeyScanner: unexpected lock strength %s", str.String())
}
}
// lockTableKeyScanner is used to scan a single key in the replicated lock
// table. It searches for locks on the key that conflict with a (transaction,
// lock strength) pair and for locks that the transaction has already acquired
// on the key.
//
// The purpose of a lockTableKeyScanner is to determine whether a transaction
// can acquire a lock on a key or perform an MVCC mutation on a key, and if so,
// what lock table keys the transaction should write to perform the operation.
type lockTableKeyScanner struct {
iter *LockTableIterator
// The transaction attempting to acquire a lock. The ID will be zero if a
// non-transactional request is attempting to perform an MVCC mutation.
txnID uuid.UUID
// Stop adding conflicting locks and abort scan once the maxConflicts limit
// is reached. Ignored if zero.
maxConflicts int64
// Stop adding conflicting locks and abort scan once the targetBytesPerConflict
// limit is reached via collected intent size. Ignored if zero.
targetBytesPerConflict int64
// Stores any error returned. If non-nil, iteration short circuits.
err error
// Stores any locks that conflict with the transaction and locking strength.
conflicts []roachpb.Lock
// Stores the total byte size of conflicts.
conflictBytes int64
// Stores any locks that the transaction has already acquired.
ownLocks [len(replicatedLockStrengths)]*enginepb.MVCCMetadata
// Avoids heap allocations.
ltKeyBuf []byte
ltValue enginepb.MVCCMetadata
firstOwnLock enginepb.MVCCMetadata
}
var lockTableKeyScannerPool = sync.Pool{
New: func() interface{} { return new(lockTableKeyScanner) },
}
// newLockTableKeyScanner creates a new lockTableKeyScanner.
//
// txnID corresponds to the ID of the transaction attempting to acquire locks.
// If txnID is valid (non-empty), locks held by the transaction with any
// strength will be accumulated into the ownLocks array. Otherwise, if txnID is
// empty, the request is non-transactional and no locks will be accumulated into
// the ownLocks array.
//
// str is the strength of the lock that the transaction (or non-transactional
// request) is attempting to acquire. The scanner will search for locks held by
// other transactions that conflict with this strength[1].
//
// maxConflicts is the maximum number of conflicting locks that the scanner
// should accumulate before returning an error. If maxConflicts is zero, the
// scanner will accumulate all conflicting locks.
//
// [1] It's valid to pass in lock.None for str. lock.None doesn't conflict with
// any other replicated locks; as such, passing lock.None configures the scanner
// to only return locks from the supplied txnID.
func newLockTableKeyScanner(
ctx context.Context,
reader Reader,
txnID uuid.UUID,
str lock.Strength,
maxConflicts int64,
targetBytesPerConflict int64,
readCategory ReadCategory,
) (*lockTableKeyScanner, error) {
if txnID.Equal(uuid.UUID{}) && str == lock.None {
return nil, errors.AssertionFailedf(
"configuring the scanner with an empty transaction ID and no locking strength is nonsensical",
)
}
minConflictStr, err := minConflictLockStrength(str)
if err != nil {
return nil, err
}
iter, err := NewLockTableIterator(ctx, reader, LockTableIteratorOptions{
Prefix: true,
MatchTxnID: txnID,
MatchMinStr: minConflictStr,
ReadCategory: readCategory,
})
if err != nil {
return nil, err
}
s := lockTableKeyScannerPool.Get().(*lockTableKeyScanner)
s.iter = iter
s.txnID = txnID
s.maxConflicts = maxConflicts
s.targetBytesPerConflict = targetBytesPerConflict
return s, nil
}
func (s *lockTableKeyScanner) close() {
s.iter.Close()
*s = lockTableKeyScanner{ltKeyBuf: s.ltKeyBuf}
lockTableKeyScannerPool.Put(s)
}
// scan scans the lock table at the provided key for locks held by other
// transactions that conflict with the configured locking strength and for locks
// of any strength that the configured transaction has already acquired.
func (s *lockTableKeyScanner) scan(key roachpb.Key) error {
s.resetScanState()
for ok := s.seek(key); ok; ok = s.getOneAndAdvance() {
}
return s.afterScan()
}
// resetScanState resets the scanner's state before a scan.
func (s *lockTableKeyScanner) resetScanState() {
s.err = nil
s.conflicts = nil
s.conflictBytes = 0
for i := range s.ownLocks {
s.ownLocks[i] = nil
}
s.ltValue.Reset()
s.firstOwnLock.Reset()
}
// afterScan returns any error encountered during the scan.
func (s *lockTableKeyScanner) afterScan() error {
if s.err != nil {
return s.err
}
if len(s.conflicts) != 0 {
return &kvpb.LockConflictError{Locks: s.conflicts}
}
return nil
}
// seek seeks the iterator to the first lock table key associated with the
// provided key. Returns true if the scanner should continue scanning, false
// if not.
func (s *lockTableKeyScanner) seek(key roachpb.Key) bool {
var ltKey roachpb.Key
ltKey, s.ltKeyBuf = keys.LockTableSingleKey(key, s.ltKeyBuf)
valid, err := s.iter.SeekEngineKeyGE(EngineKey{Key: ltKey})
if err != nil {
s.err = err
}
return valid
}
// getOneAndAdvance consumes the current lock table key and value and advances
// the iterator. Returns true if the scanner should continue scanning, false if
// not.
func (s *lockTableKeyScanner) getOneAndAdvance() bool {
ltKey, ok := s.getLockTableKey()
if !ok {
return false
}
ltValue, ok := s.getLockTableValue()
if !ok {
return false
}
if !s.consumeLockTableKeyValue(ltKey, ltValue) {
return false
}
return s.advance()
}
// advance advances the iterator to the next lock table key.
func (s *lockTableKeyScanner) advance() bool {
valid, err := s.iter.NextEngineKey()
if err != nil {
s.err = err
}
return valid
}
// getLockTableKey decodes the current lock table key.
func (s *lockTableKeyScanner) getLockTableKey() (LockTableKey, bool) {
ltEngKey, err := s.iter.UnsafeEngineKey()
if err != nil {
s.err = err
return LockTableKey{}, false
}
ltKey, err := ltEngKey.ToLockTableKey()
if err != nil {
s.err = err
return LockTableKey{}, false
}
return ltKey, true
}
// getLockTableValue decodes the current lock table values.
func (s *lockTableKeyScanner) getLockTableValue() (*enginepb.MVCCMetadata, bool) {
err := s.iter.ValueProto(&s.ltValue)
if err != nil {
s.err = err
return nil, false
}
return &s.ltValue, true
}
// consumeLockTableKeyValue consumes the current lock table key and value, which
// is either a conflicting lock or a lock held by the scanning transaction.
func (s *lockTableKeyScanner) consumeLockTableKeyValue(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
if ltValue.Txn == nil {
s.err = errors.AssertionFailedf("unexpectedly found non-transactional lock: %v", ltValue)
return false
}
if ltKey.TxnUUID != ltValue.Txn.ID {
s.err = errors.AssertionFailedf("lock table key (%+v) and value (%+v) txn ID mismatch", ltKey, ltValue)
return false
}
if ltKey.TxnUUID == s.txnID {
return s.consumeOwnLock(ltKey, ltValue)
}
return s.consumeConflictingLock(ltKey, ltValue)
}
// consumeOwnLock consumes a lock held by the scanning transaction.
func (s *lockTableKeyScanner) consumeOwnLock(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
var ltValueCopy *enginepb.MVCCMetadata
if s.firstOwnLock.Txn == nil {
// This is the first lock held by the transaction that we've seen, so
// we can avoid the heap allocation.
ltValueCopy = &s.firstOwnLock
} else {
ltValueCopy = new(enginepb.MVCCMetadata)
}
// NOTE: this will alias internal pointer fields of ltValueCopy with those
// in ltValue, but this will not lead to issues when ltValue is updated by
// the next call to getLockTableValue, because its internal fields will be
// reset by protoutil.Unmarshal before unmarshalling.
*ltValueCopy = *ltValue
s.ownLocks[replicatedLockStrengthToIndexMap[ltKey.Strength]] = ltValueCopy
return true
}
// consumeConflictingLock consumes a conflicting lock.
func (s *lockTableKeyScanner) consumeConflictingLock(
ltKey LockTableKey, ltValue *enginepb.MVCCMetadata,
) bool {
conflict := roachpb.MakeLock(ltValue.Txn, ltKey.Key.Clone(), ltKey.Strength)
conflictSize := int64(conflict.Size())
s.conflictBytes += conflictSize
s.conflicts = append(s.conflicts, conflict)
if s.maxConflicts != 0 && s.maxConflicts == int64(len(s.conflicts)) {
return false
}
if s.targetBytesPerConflict != 0 && s.conflictBytes >= s.targetBytesPerConflict {
return false
}
return true
}
// foundOwn returns the lock table value for the provided strength if the
// transaction has already acquired a lock of that strength. Returns nil if not.
func (s *lockTableKeyScanner) foundOwn(str lock.Strength) *enginepb.MVCCMetadata {
return s.ownLocks[replicatedLockStrengthToIndexMap[str]]
}