Skip to content

Commit

Permalink
kv: fix conflict resolution for high-priority, non-txn'al requests
Browse files Browse the repository at this point in the history
Fixes #83342.

This commit reworks the behavior of non-transactional requests in lock
wait-queues when the lock holder or the lock queue waiter has an extreme
priority (min or max priority). In such cases, we allow the lock queue
waiter to immediately push the lock holder out of its way, either by
moving its timestamp to resolve a read-write conflict or aborting it to
resolve a write-write conflict.

This handling was broken in two ways for non-transactional requests.
1. these requests' priorities were not consulted when deciding whether
   to immediately push instead of temporarily delaying while waiting in the
   lock wait-queue. This meant that a high-priority, non-txn request might
   still wait for 50ms (kv.lock_table.coordinator_liveness_push_delay)
   before pushing a lower priority lock holder out of its way.
2. worse, it was possible that if these requests were not in the front
   of a lock wait-queue, they might never push. This was because we had
   logic that disabled a push if it was not needed for the purposes of
   checking liveness, detecting deadlocks, or enforcing timeouts.

This commit resolves both of these issues. It also improves the testing
of transaction priorities in the `kv/kvserver/concurrency` package.
Finally, it consolidates the determination of when a pusher should be
able to push/abort a pushee into a single location.

Release note (bug fix): a bug in transaction conflict resolution which
could allow backups to wait on long-running transactions has been
resolved.
  • Loading branch information
nvanbenschoten committed Jul 5, 2022
1 parent cff9d8b commit 83ac099
Show file tree
Hide file tree
Showing 10 changed files with 695 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func PushTxn(
// If just attempting to cleanup old or already-committed txns,
// pusher always fails.
pusherWins = false
case CanPushWithPriority(&args.PusherTxn, &reply.PusheeTxn):
case txnwait.CanPushWithPriority(args.PusherTxn.Priority, reply.PusheeTxn.Priority):
reason = "pusher has priority"
pusherWins = true
case args.Force:
Expand Down
7 changes: 0 additions & 7 deletions pkg/kv/kvserver/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,6 @@ func UpdateAbortSpan(
return rec.AbortSpan().Put(ctx, readWriter, ms, txn.ID, &curEntry)
}

// CanPushWithPriority returns true if the given pusher can push the pushee
// based on its priority.
func CanPushWithPriority(pusher, pushee *roachpb.Transaction) bool {
return (pusher.Priority > enginepb.MinTxnPriority && pushee.Priority == enginepb.MinTxnPriority) ||
(pusher.Priority == enginepb.MaxTxnPriority && pushee.Priority < pusher.Priority)
}

// CanCreateTxnRecord determines whether a transaction record can be created for
// the provided transaction. If not, the function will return an error. If so,
// the function may modify the provided transaction.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ type Request struct {
Timestamp hlc.Timestamp

// The priority of the request. Only set if Txn is nil.
Priority roachpb.UserPriority
NonTxnPriority roachpb.UserPriority

// The consistency level of the request. Only set if Txn is nil.
ReadConsistency roachpb.ReadConsistencyType
Expand Down
63 changes: 48 additions & 15 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import (
//
// The input files use the following DSL:
//
// new-txn name=<txn-name> ts=<int>[,<int>] epoch=<int> [uncertainty-limit=<int>[,<int>]]
// new-txn name=<txn-name> ts=<int>[,<int>] [epoch=<int>] [priority] [uncertainty-limit=<int>[,<int>]]
// new-request name=<req-name> txn=<txn-name>|none ts=<int>[,<int>] [priority] [inconsistent] [wait-policy=<policy>] [lock-timeout] [max-lock-wait-queue-length=<int>] [poison-policy=[err|wait]]
// <proto-name> [<field-name>=<field-value>...] (hint: see scanSingleRequest)
// sequence req=<req-name> [eval-kind=<pess|opt|pess-after-opt]
Expand Down Expand Up @@ -104,8 +104,12 @@ func TestConcurrencyManagerBasic(t *testing.T) {
d.ScanArgs(t, "name", &txnName)
ts := scanTimestamp(t, d)

var epoch int
d.ScanArgs(t, "epoch", &epoch)
epoch := 0
if d.HasArg("epoch") {
d.ScanArgs(t, "epoch", &epoch)
}

priority := scanTxnPriority(t, d)

uncertaintyLimit := ts
if d.HasArg("uncertainty-limit") {
Expand All @@ -125,7 +129,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
Epoch: enginepb.TxnEpoch(epoch),
WriteTimestamp: ts,
MinTimestamp: ts,
Priority: 1, // not min or max
Priority: priority,
},
ReadTimestamp: ts,
GlobalUncertaintyLimit: uncertaintyLimit,
Expand Down Expand Up @@ -159,6 +163,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
readConsistency = roachpb.INCONSISTENT
}

priority := scanUserPriority(t, d)
waitPolicy := scanWaitPolicy(t, d, false /* required */)

var lockTimeout time.Duration
Expand All @@ -182,9 +187,9 @@ func TestConcurrencyManagerBasic(t *testing.T) {
latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs)

c.requestsByName[reqName] = concurrency.Request{
Txn: txn,
Timestamp: ts,
// TODO(nvanbenschoten): test Priority
Txn: txn,
Timestamp: ts,
NonTxnPriority: priority,
ReadConsistency: readConsistency,
WaitPolicy: waitPolicy,
LockTimeout: lockTimeout,
Expand Down Expand Up @@ -693,19 +698,47 @@ func (c *cluster) PushTransaction(
}
defer c.unregisterPush(push)
}
var pusherPriority enginepb.TxnPriority
if h.Txn != nil {
pusherPriority = h.Txn.Priority
} else {
pusherPriority = roachpb.MakePriority(h.UserPriority)
}
pushTo := h.Timestamp.Next()
for {
// Is the pushee pushed?
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
var pushed bool
switch pushType {
case roachpb.PUSH_TIMESTAMP:
pushed = h.Timestamp.Less(pusheeTxn.WriteTimestamp) || pusheeTxn.Status.IsFinalized()
case roachpb.PUSH_ABORT, roachpb.PUSH_TOUCH:
pushed = pusheeTxn.Status.IsFinalized()
// NOTE: this logic is adapted from cmd_push_txn.go.
var pusherWins bool
switch {
case pusheeTxn.Status.IsFinalized():
// Already finalized.
return pusheeTxn, nil
case pushType == roachpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
case pushType == roachpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pusherPriority, pusheeTxn.Priority):
pusherWins = true
default:
return nil, roachpb.NewErrorf("unexpected push type: %s", pushType)
pusherWins = false
}
if pushed {
if pusherWins {
switch pushType {
case roachpb.PUSH_ABORT:
log.Eventf(ctx, "pusher aborted pushee")
err = c.updateTxnRecord(pusheeTxn.ID, roachpb.ABORTED, pusheeTxn.WriteTimestamp)
case roachpb.PUSH_TIMESTAMP:
log.Eventf(ctx, "pusher pushed pushee to %s", pushTo)
err = c.updateTxnRecord(pusheeTxn.ID, pusheeTxn.Status, pushTo)
default:
err = errors.Errorf("unexpected push type: %s", pushType)
}
if err != nil {
return nil, roachpb.NewError(err)
}
pusheeTxn, _ = pusheeRecord.asTxn()
return pusheeTxn, nil
}
// If PUSH_TOUCH, return error instead of waiting.
Expand Down
35 changes: 35 additions & 0 deletions pkg/kv/kvserver/concurrency/datadriven_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,41 @@ func scanTimestampWithName(t *testing.T, d *datadriven.TestData, name string) hl
return ts
}

func scanTxnPriority(t *testing.T, d *datadriven.TestData) enginepb.TxnPriority {
priority := scanUserPriority(t, d)
// NB: don't use roachpb.MakePriority to avoid randomness.
switch priority {
case roachpb.MinUserPriority:
return enginepb.MinTxnPriority
case roachpb.NormalUserPriority:
return 1 // not min nor max
case roachpb.MaxUserPriority:
return enginepb.MaxTxnPriority
default:
d.Fatalf(t, "unknown priority: %s", priority)
return 0
}
}

func scanUserPriority(t *testing.T, d *datadriven.TestData) roachpb.UserPriority {
const key = "priority"
priS := "normal"
if d.HasArg(key) {
d.ScanArgs(t, key, &priS)
}
switch priS {
case "low":
return roachpb.MinUserPriority
case "normal":
return roachpb.NormalUserPriority
case "high":
return roachpb.MaxUserPriority
default:
d.Fatalf(t, "unknown priority: %s", priS)
return 0
}
}

func scanLockStrength(t *testing.T, d *datadriven.TestData) lock.Strength {
var strS string
d.ScanArgs(t, "strength", &strS)
Expand Down
36 changes: 22 additions & 14 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/txnwait"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand Down Expand Up @@ -221,9 +222,14 @@ func (w *lockTableWaiterImpl) WaitOn(
// still active.
timeoutPush := req.LockTimeout != 0

// If the pushee has the minimum priority or if the pusher has the
// maximum priority, push immediately to proceed without queueing.
// The push should succeed without entering the txn wait-queue.
priorityPush := canPushWithPriority(req, state)

// If the request doesn't want to perform a delayed push for any
// reason, continue waiting without a timer.
if !livenessPush && !deadlockPush && !timeoutPush {
if !(livenessPush || deadlockPush || timeoutPush || priorityPush) {
continue
}

Expand All @@ -250,12 +256,7 @@ func (w *lockTableWaiterImpl) WaitOn(
}
delay = minDuration(delay, w.timeUntilDeadline(lockDeadline))
}

// However, if the pushee has the minimum priority or if the
// pusher has the maximum priority, push immediately.
// TODO(nvanbenschoten): flesh these interactions out more and
// add some testing.
if hasMinPriority(state.txn) || hasMaxPriority(req.Txn) {
if priorityPush {
delay = 0
}

Expand Down Expand Up @@ -754,7 +755,7 @@ func (w *lockTableWaiterImpl) pushRequestTxn(
func (w *lockTableWaiterImpl) pushHeader(req Request) roachpb.Header {
h := roachpb.Header{
Timestamp: req.Timestamp,
UserPriority: req.Priority,
UserPriority: req.NonTxnPriority,
}
if req.Txn != nil {
// We are going to hand the header (and thus the transaction proto) to
Expand Down Expand Up @@ -1224,12 +1225,19 @@ func newWriteIntentErr(
return err
}

func hasMinPriority(txn *enginepb.TxnMeta) bool {
return txn != nil && txn.Priority == enginepb.MinTxnPriority
}

func hasMaxPriority(txn *roachpb.Transaction) bool {
return txn != nil && txn.Priority == enginepb.MaxTxnPriority
func canPushWithPriority(req Request, s waitingState) bool {
var pusher, pushee enginepb.TxnPriority
if req.Txn != nil {
pusher = req.Txn.Priority
} else {
pusher = roachpb.MakePriority(req.NonTxnPriority)
}
if s.txn == nil {
// Can't push a non-transactional request.
return false
}
pushee = s.txn.Priority
return txnwait.CanPushWithPriority(pusher, pushee)
}

func minDuration(a, b time.Duration) time.Duration {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,8 @@ func TestLockTableWaiterWithNonTxn(t *testing.T) {
reqHeaderTS := hlc.Timestamp{WallTime: 10}
makeReq := func() Request {
return Request{
Timestamp: reqHeaderTS,
Priority: roachpb.NormalUserPriority,
Timestamp: reqHeaderTS,
NonTxnPriority: roachpb.NormalUserPriority,
}
}

Expand Down
Loading

0 comments on commit 83ac099

Please sign in to comment.