From ded4a8bb420cc0df0e753a14a6aa5ba1a362b81a Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Wed, 27 Sep 2023 10:36:52 -0500 Subject: [PATCH] kvserver: hook up {shared, exclusive} replicated locks end to end Now that both the storage layer and the concurrency manager have been taught about {shared, exclusive} replicated locks, we can hook things up end to end. This patch does so by: - Checking for conflicts with replicated locks during lock acquisition of an unreplicated lock. - Checking for conflicts and acquiring a replicated lock during lock acquisition of a replicated lock. Closes #109672 Informs #100193 Release note: None --- pkg/kv/kvserver/batcheval/cmd_get.go | 2 +- pkg/kv/kvserver/batcheval/cmd_reverse_scan.go | 5 ++- pkg/kv/kvserver/batcheval/cmd_scan.go | 5 ++- pkg/kv/kvserver/batcheval/intent.go | 45 +++++++++++-------- 4 files changed, 34 insertions(+), 23 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_get.go b/pkg/kv/kvserver/batcheval/cmd_get.go index 92d675520e8a..1791f80746c6 100644 --- a/pkg/kv/kvserver/batcheval/cmd_get.go +++ b/pkg/kv/kvserver/batcheval/cmd_get.go @@ -85,7 +85,7 @@ func Get( var res result.Result if args.KeyLockingStrength != lock.None && h.Txn != nil && getRes.Value != nil { acq, err := acquireLockOnKey(ctx, readWriter, h.Txn, args.KeyLockingStrength, - args.KeyLockingDurability, args.Key) + args.KeyLockingDurability, args.Key, cArgs.Stats, cArgs.EvalCtx.ClusterSettings()) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go index 2ecfb75be871..45a734814b2c 100644 --- a/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_reverse_scan.go @@ -110,8 +110,9 @@ func ReverseScan( } if args.KeyLockingStrength != lock.None && h.Txn != nil { - acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength, - args.KeyLockingDurability, args.ScanFormat, &scanRes) + acquiredLocks, err := acquireLocksOnKeys( + ctx, readWriter, h.Txn, args.KeyLockingStrength, args.KeyLockingDurability, + args.ScanFormat, &scanRes, cArgs.Stats, cArgs.EvalCtx.ClusterSettings()) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 6de565175772..7d251655a541 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -110,8 +110,9 @@ func Scan( } if args.KeyLockingStrength != lock.None && h.Txn != nil { - acquiredLocks, err := acquireLocksOnKeys(ctx, readWriter, h.Txn, args.KeyLockingStrength, - args.KeyLockingDurability, args.ScanFormat, &scanRes) + acquiredLocks, err := acquireLocksOnKeys( + ctx, readWriter, h.Txn, args.KeyLockingStrength, args.KeyLockingDurability, + args.ScanFormat, &scanRes, cArgs.Stats, cArgs.EvalCtx.ClusterSettings()) if err != nil { return result.Result{}, err } diff --git a/pkg/kv/kvserver/batcheval/intent.go b/pkg/kv/kvserver/batcheval/intent.go index 1b2d31a66768..2cfb008b04df 100644 --- a/pkg/kv/kvserver/batcheval/intent.go +++ b/pkg/kv/kvserver/batcheval/intent.go @@ -16,7 +16,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" ) @@ -119,6 +121,8 @@ func acquireLocksOnKeys( dur lock.Durability, scanFmt kvpb.ScanFormat, scanRes *storage.MVCCScanResult, + ms *enginepb.MVCCStats, + settings *cluster.Settings, ) ([]roachpb.LockAcquisition, error) { acquiredLocks := make([]roachpb.LockAcquisition, scanRes.NumKeys) switch scanFmt { @@ -126,7 +130,7 @@ func acquireLocksOnKeys( var i int err := storage.MVCCScanDecodeKeyValues(scanRes.KVData, func(key storage.MVCCKey, _ []byte) error { k := copyKey(key.Key) - acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k) + acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k, ms, settings) if err != nil { return err } @@ -141,7 +145,7 @@ func acquireLocksOnKeys( case kvpb.KEY_VALUES: for i, row := range scanRes.KVs { k := copyKey(row.Key) - acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k) + acq, err := acquireLockOnKey(ctx, readWriter, txn, str, dur, k, ms, settings) if err != nil { return nil, err } @@ -169,26 +173,31 @@ func acquireLockOnKey( str lock.Strength, dur lock.Durability, key roachpb.Key, + ms *enginepb.MVCCStats, + settings *cluster.Settings, ) (roachpb.LockAcquisition, error) { - // TODO(arul,nvanbenschoten): For now, we're only checking whether we have - // access to a legit pebble.Writer for replicated lock acquisition. We're not - // actually acquiring a replicated lock -- we can only do so once they're - // fully supported in the storage package. Until then, we grab an unreplicated - // lock regardless of what the caller asked us to do. - if dur == lock.Replicated { - // ShouldWriteLocalTimestamp is only implemented by a pebble.Writer; it'll - // panic if we were on the read-only evaluation path, and only had access to - // a pebble.ReadOnly. - readWriter.ShouldWriteLocalTimestamps(ctx) - // Regardless of what the caller asked for, we'll give it an unreplicated - // lock. - dur = lock.Unreplicated - } + maxLockConflicts := storage.MaxConflictsPerLockConflictError.Get(&settings.SV) switch dur { case lock.Unreplicated: - // TODO(arul,nvanbenschoten): Call into MVCCCheckForAcquireLockHere. + // Evaluation up until this point has only scanned for (and not found any) + // conflicts with locks in the in-memory lock table. This includes all + // unreplicated locks and contended replicated locks. We haven't considered + // conflicts with un-contended replicated locks -- we need to do so before + // we can acquire our own unreplicated lock; do so now. + err := storage.MVCCCheckForAcquireLock(ctx, readWriter, txn, str, key, maxLockConflicts) + if err != nil { + return roachpb.LockAcquisition{}, err + } case lock.Replicated: - // TODO(arul,nvanbenschoten): Call into MVCCAcquireLock here. + // Evaluation up until this point has only scanned for (and not found any) + // conflicts with locks in the in-memory lock table. This includes all + // unreplicated locks and contended replicated locks. We haven't considered + // conflicts with un-contended replicated locks -- we need to do so before + // we can acquire our own replicated lock; do that now, and also acquire + // the replicated lock if no conflicts are found. + if err := storage.MVCCAcquireLock(ctx, readWriter, txn, str, key, ms, maxLockConflicts); err != nil { + return roachpb.LockAcquisition{}, err + } default: panic("unexpected lock durability") }