Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: support FOR {UPDATE,SHARE} SKIP LOCKED #79134

Merged
merged 5 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -689,11 +690,30 @@ func (ds *DistSender) initAndVerifyBatch(
// Accepted point requests that can be in batches with limit.

default:
return roachpb.NewErrorf("batch with limit contains %T request", inner)
return roachpb.NewErrorf("batch with limit contains %s request", inner.Method())
}
}
}

switch ba.WaitPolicy {
case lock.WaitPolicy_Block, lock.WaitPolicy_Error:
// Default. All request types supported.
case lock.WaitPolicy_SkipLocked:
for _, req := range ba.Requests {
inner := req.GetInner()
if !roachpb.CanSkipLocked(inner) {
switch inner.(type) {
case *roachpb.QueryIntentRequest, *roachpb.EndTxnRequest:
// Not directly supported, but can be part of the same batch.
default:
return roachpb.NewErrorf("batch with SkipLocked wait policy contains %s request", inner.Method())
}
}
}
default:
return roachpb.NewErrorf("unknown wait policy %s", ba.WaitPolicy)
}

return nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,13 +562,12 @@ func (sr *txnSpanRefresher) tryRefreshTxnSpans(
func (sr *txnSpanRefresher) appendRefreshSpans(
ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse,
) error {
ba.RefreshSpanIterate(br, func(span roachpb.Span) {
return ba.RefreshSpanIterate(br, func(span roachpb.Span) {
if log.ExpensiveLogEnabled(ctx, 3) {
log.VEventf(ctx, 3, "recording span to refresh: %s", span.String())
}
sr.refreshFootprint.insert(span)
})
return nil
}

// canForwardReadTimestampWithoutRefresh returns whether the transaction can
Expand Down
26 changes: 26 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,8 @@ func TestTxnWaitPolicies(t *testing.T) {

testutils.RunTrueAndFalse(t, "highPriority", func(t *testing.T, highPriority bool) {
key := []byte("b")
require.NoError(t, s.DB.Put(ctx, key, "old value"))

txn := s.DB.NewTxn(ctx, "test txn")
require.NoError(t, txn.Put(ctx, key, "new value"))

Expand Down Expand Up @@ -819,6 +821,30 @@ func TestTxnWaitPolicies(t *testing.T) {
require.True(t, errors.As(err, &wiErr))
require.Equal(t, roachpb.WriteIntentError_REASON_WAIT_POLICY, wiErr.Reason)

// SkipLocked wait policy.
type skipRes struct {
res []kv.Result
err error
}
skipC := make(chan skipRes)
go func() {
var b kv.Batch
b.Header.UserPriority = pri
b.Header.WaitPolicy = lock.WaitPolicy_SkipLocked
b.Get(key)
err := s.DB.Run(ctx, &b)
skipC <- skipRes{res: b.Results, err: err}
}()

// Should return successful but empty result immediately, without blocking.
// Priority does not matter.
res := <-skipC
require.Nil(t, res.err)
require.Len(t, res.res, 1)
getRes := res.res[0]
require.Len(t, getRes.Rows, 1)
require.False(t, getRes.Rows[0].Exists())

// Let blocked requests proceed.
require.NoError(t, txn.Commit(ctx))
if !highPriority {
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ go_library(
"//pkg/kv/kvserver/closedts/sidetransport",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/concurrency/poison",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/gc",
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func Get(
var err error
val, intent, err = storage.MVCCGet(ctx, reader, args.Key, h.Timestamp, storage.MVCCGetOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
FailOnMoreRecent: args.KeyLocking != lock.None,
Uncertainty: cArgs.Uncertainty,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
})
if err != nil {
return result.Result{}, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_reverse_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func ReverseScan(
clusterversion.TargetBytesAvoidExcess)
opts := storage.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
MaxKeys: h.MaxSpanRequestKeys,
MaxIntents: storage.MaxIntentsPerWriteIntentError.Get(&cArgs.EvalCtx.ClusterSettings().SV),
Expand All @@ -54,6 +55,7 @@ func ReverseScan(
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: true,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
}

switch args.ScanFormat {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func Scan(
clusterversion.TargetBytesAvoidExcess)
opts := storage.MVCCScanOptions{
Inconsistent: h.ReadConsistency != roachpb.CONSISTENT,
SkipLocked: h.WaitPolicy == lock.WaitPolicy_SkipLocked,
Txn: h.Txn,
Uncertainty: cArgs.Uncertainty,
MaxKeys: h.MaxSpanRequestKeys,
Expand All @@ -55,6 +56,7 @@ func Scan(
FailOnMoreRecent: args.KeyLocking != lock.None,
Reverse: false,
MemoryAccount: cArgs.EvalCtx.GetResponseMemoryAccount(),
LockTable: cArgs.Concurrency,
}

switch args.ScanFormat {
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/batcheval/declare.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
Expand Down Expand Up @@ -121,5 +122,6 @@ type CommandArgs struct {
Now hlc.ClockTimestamp
// *Stats should be mutated to reflect any writes made by the command.
Stats *enginepb.MVCCStats
Concurrency *concurrency.Guard
Uncertainty uncertainty.Interval
}
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,14 @@ type lockTableGuard interface {
// so this checking is practically only going to find unreplicated locks
// that conflict.
CheckOptimisticNoConflicts(*spanset.SpanSet) (ok bool)

// IsKeyLockedByConflictingTxn returns whether the provided key is locked by a
// conflicting transaction in the lockTableGuard's snapshot of the lock table,
// given the caller's own desired locking strength. If so, the lock holder is
// returned. A transaction's own lock does not appear to be locked to itself.
// The method is used by requests in conjunction with the SkipLocked wait
// policy to determine which keys they should skip over during evaluation.
IsKeyLockedByConflictingTxn(roachpb.Key, lock.Strength) (bool, *enginepb.TxnMeta)
}

// lockTableWaiter is concerned with waiting in lock wait-queues for locks held
Expand Down
12 changes: 12 additions & 0 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,18 @@ func (g *Guard) CheckOptimisticNoLatchConflicts() (ok bool) {
return g.lm.CheckOptimisticNoConflicts(g.lg, g.Req.LatchSpans)
}

// IsKeyLockedByConflictingTxn returns whether the provided key is locked by a
// conflicting transaction in the Guard's snapshot of the lock table, given the
// caller's own desired locking strength. If so, the lock holder is returned. A
// transaction's own lock does not appear to be locked to itself. The method is
// used by requests in conjunction with the SkipLocked wait policy to determine
// which keys they should skip over during evaluation.
func (g *Guard) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
) (bool, *enginepb.TxnMeta) {
return g.ltg.IsKeyLockedByConflictingTxn(key, strength)
}

func (g *Guard) moveLatchGuard() latchGuard {
lg := g.lg
g.lg = nil
Expand Down
18 changes: 17 additions & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ import (
// handle-write-intent-error req=<req-name> txn=<txn-name> key=<key> lease-seq=<seq>
// handle-txn-push-error req=<req-name> txn=<txn-name> key=<key> TODO(nvanbenschoten): implement this
//
// check-opt-no-conflicts req=<req-name>
// check-opt-no-conflicts req=<req-name>
// is-key-locked-by-conflicting-txn req=<req-name> key=<key> strength=<strength>
//
// on-lock-acquired req=<req-name> key=<key> [seq=<seq>] [dur=r|u]
// on-lock-updated req=<req-name> txn=<txn-name> key=<key> status=[committed|aborted|pending] [ts=<int>[,<int>]]
Expand Down Expand Up @@ -347,6 +348,21 @@ func TestConcurrencyManagerBasic(t *testing.T) {
latchSpans, lockSpans := c.collectSpans(t, g.Req.Txn, g.Req.Timestamp, reqs)
return fmt.Sprintf("no-conflicts: %t", g.CheckOptimisticNoConflicts(latchSpans, lockSpans))

case "is-key-locked-by-conflicting-txn":
var reqName string
d.ScanArgs(t, "req", &reqName)
g, ok := c.guardsByReqName[reqName]
if !ok {
d.Fatalf(t, "unknown request: %s", reqName)
}
var key string
d.ScanArgs(t, "key", &key)
strength := scanLockStrength(t, d)
if ok, txn := g.IsKeyLockedByConflictingTxn(roachpb.Key(key), strength); ok {
return fmt.Sprintf("locked: true, holder: %s", txn.ID)
}
return "locked: false"

case "on-lock-acquired":
var reqName string
d.ScanArgs(t, "req", &reqName)
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ func scanTimestampWithName(t *testing.T, d *datadriven.TestData, name string) hl
return ts
}

func scanLockStrength(t *testing.T, d *datadriven.TestData) lock.Strength {
var strS string
d.ScanArgs(t, "strength", &strS)
switch strS {
case "none":
return lock.None
case "shared":
return lock.Shared
case "upgrade":
return lock.Upgrade
case "exclusive":
return lock.Exclusive
default:
d.Fatalf(t, "unknown lock strength: %s", strS)
return 0
}
}

func scanLockDurability(t *testing.T, d *datadriven.TestData) lock.Durability {
var durS string
d.ScanArgs(t, "dur", &durS)
Expand All @@ -70,6 +88,8 @@ func scanWaitPolicy(t *testing.T, d *datadriven.TestData, required bool) lock.Wa
return lock.WaitPolicy_Block
case "error":
return lock.WaitPolicy_Error
case "skip-locked":
return lock.WaitPolicy_SkipLocked
default:
d.Fatalf(t, "unknown wait policy: %s", policy)
return 0
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/lock/locking.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// concurrency control in the key-value layer.
package lock

import fmt "fmt"
import "fmt"

// MaxDurability is the maximum value in the Durability enum.
const MaxDurability = Unreplicated
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/concurrency/lock/locking.proto
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,21 @@ enum Durability {
// until the conflicting lock is released, but other policies can make sense in
// special situations.
enum WaitPolicy {
// Block indicates that if a request encounters a conflicting locks held by
// Block indicates that if a request encounters a conflicting lock held by
// another active transaction, it should wait for the conflicting lock to be
// released before proceeding.
Block = 0;

// Error indicates that if a request encounters a conflicting locks held by
// Error indicates that if a request encounters a conflicting lock held by
// another active transaction, it should raise an error instead of blocking.
// If the request encounters a conflicting lock that was abandoned by an
// inactive transaction, which is likely due to a transaction coordinator
// crash, the lock is removed and no error is raised.
Error = 1;

// SkipLocked indicates that if a request encounters a conflicting lock held
// by another transaction while scanning, it should skip over the key that is
// locked instead of blocking and later acquiring a lock on that key. The
// locked key will not be included in the scan result.
SkipLocked = 2;
}
45 changes: 45 additions & 0 deletions pkg/kv/kvserver/concurrency/lock_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ type lockTableGuardImpl struct {
txn *enginepb.TxnMeta
ts hlc.Timestamp
spans *spanset.SpanSet
waitPolicy lock.WaitPolicy
maxWaitQueueLength int

// Snapshots of the trees for which this request has some spans. Note that
Expand Down Expand Up @@ -542,6 +543,11 @@ func (g *lockTableGuardImpl) updateStateLocked(newState waitingState) {
}

func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet) (ok bool) {
if g.waitPolicy == lock.WaitPolicy_SkipLocked {
// If the request is using a SkipLocked wait policy, lock conflicts are
// handled during evaluation.
return true
}
// Temporarily replace the SpanSet in the guard.
originalSpanSet := g.spans
g.spans = spanSet
Expand All @@ -568,6 +574,35 @@ func (g *lockTableGuardImpl) CheckOptimisticNoConflicts(spanSet *spanset.SpanSet
return true
}

func (g *lockTableGuardImpl) IsKeyLockedByConflictingTxn(
key roachpb.Key, strength lock.Strength,
) (bool, *enginepb.TxnMeta) {
ss := spanset.SpanGlobal
if keys.IsLocal(key) {
ss = spanset.SpanLocal
}
tree := g.tableSnapshot[ss]
iter := tree.MakeIter()
iter.SeekGE(&lockState{key: key})
if !iter.Valid() || !iter.Cur().key.Equal(key) {
// No lock on key.
return false, nil
}
l := iter.Cur()
l.mu.Lock()
defer l.mu.Unlock()
lockHolderTxn, lockHolderTS := l.getLockHolder()
if lockHolderTxn != nil && g.isSameTxn(lockHolderTxn) {
// Already locked by this txn.
return false, nil
}
if strength == lock.None && g.ts.Less(lockHolderTS) {
// Non-locking read below lock's timestamp.
return false, nil
}
return true, lockHolderTxn
}

func (g *lockTableGuardImpl) notify() {
select {
case g.mu.signal <- struct{}{}:
Expand Down Expand Up @@ -2413,6 +2448,15 @@ func (t *lockTableImpl) ScanAndEnqueue(req Request, guard lockTableGuard) lockTa
g.toResolve = g.toResolve[:0]
}
t.doSnapshotForGuard(g)

if g.waitPolicy == lock.WaitPolicy_SkipLocked {
// If the request is using a SkipLocked wait policy, it captures a lockTable
// snapshot but does not scan the lock table when sequencing. Instead, it
// calls into IsKeyLockedByConflictingTxn before adding keys to its result
// set to determine which keys it should skip.
return g
}

g.findNextLockAfter(true /* notify */)
if g.notRemovableLock != nil {
// Either waiting at the notRemovableLock, or elsewhere. Either way we are
Expand All @@ -2430,6 +2474,7 @@ func (t *lockTableImpl) newGuardForReq(req Request) *lockTableGuardImpl {
g.txn = req.txnMeta()
g.ts = req.Timestamp
g.spans = req.LockSpans
g.waitPolicy = req.WaitPolicy
g.maxWaitQueueLength = req.MaxLockWaitQueueLength
g.sa = spanset.NumSpanAccess - 1
g.index = -1
Expand Down
Loading