From 83ac099b2d4ad0c9f036458f072bb94acb8da7a2 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 24 Jun 2022 20:45:33 -0400 Subject: [PATCH] kv: fix conflict resolution for high-priority, non-txn'al requests Fixes #83342. This commit reworks the behavior of non-transactional requests in lock wait-queues when the lock holder or the lock queue waiter has an extreme priority (min or max priority). In such cases, we allow the lock queue waiter to immediately push the lock holder out of its way, either by moving its timestamp to resolve a read-write conflict or aborting it to resolve a write-write conflict. This handling was broken in two ways for non-transactional requests. 1. these requests' priorities were not consulted when deciding whether to immediately push instead of temporarily delaying while waiting in the lock wait-queue. This meant that a high-priority, non-txn request might still wait for 50ms (kv.lock_table.coordinator_liveness_push_delay) before pushing a lower priority lock holder out of its way. 2. worse, it was possible that if these requests were not in the front of a lock wait-queue, they might never push. This was because we had logic that disabled a push if it was not needed for the purposes of checking liveness, detecting deadlocks, or enforcing timeouts. This commit resolves both of these issues. It also improves the testing of transaction priorities in the `kv/kvserver/concurrency` package. Finally, it consolidates the determination of when a pusher should be able to push/abort a pushee into a single location. Release note (bug fix): a bug in transaction conflict resolution which could allow backups to wait on long-running transactions has been resolved. --- pkg/kv/kvserver/batcheval/cmd_push_txn.go | 2 +- pkg/kv/kvserver/batcheval/transaction.go | 7 - .../concurrency/concurrency_control.go | 2 +- .../concurrency/concurrency_manager_test.go | 63 +- .../concurrency/datadriven_util_test.go | 35 ++ .../kvserver/concurrency/lock_table_waiter.go | 36 +- .../concurrency/lock_table_waiter_test.go | 4 +- .../testdata/concurrency_manager/priority | 575 ++++++++++++++++++ pkg/kv/kvserver/replica_send.go | 2 +- pkg/kv/kvserver/txnwait/queue.go | 14 +- 10 files changed, 695 insertions(+), 45 deletions(-) create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/priority diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index 8134aa90d086..cc99f4c72281 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -255,7 +255,7 @@ func PushTxn( // If just attempting to cleanup old or already-committed txns, // pusher always fails. pusherWins = false - case CanPushWithPriority(&args.PusherTxn, &reply.PusheeTxn): + case txnwait.CanPushWithPriority(args.PusherTxn.Priority, reply.PusheeTxn.Priority): reason = "pusher has priority" pusherWins = true case args.Force: diff --git a/pkg/kv/kvserver/batcheval/transaction.go b/pkg/kv/kvserver/batcheval/transaction.go index c803ed1a3dd9..bd374efda956 100644 --- a/pkg/kv/kvserver/batcheval/transaction.go +++ b/pkg/kv/kvserver/batcheval/transaction.go @@ -118,13 +118,6 @@ func UpdateAbortSpan( return rec.AbortSpan().Put(ctx, readWriter, ms, txn.ID, &curEntry) } -// CanPushWithPriority returns true if the given pusher can push the pushee -// based on its priority. -func CanPushWithPriority(pusher, pushee *roachpb.Transaction) bool { - return (pusher.Priority > enginepb.MinTxnPriority && pushee.Priority == enginepb.MinTxnPriority) || - (pusher.Priority == enginepb.MaxTxnPriority && pushee.Priority < pusher.Priority) -} - // CanCreateTxnRecord determines whether a transaction record can be created for // the provided transaction. If not, the function will return an error. If so, // the function may modify the provided transaction. diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index a3fe9054a10a..ee640416c7a3 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -380,7 +380,7 @@ type Request struct { Timestamp hlc.Timestamp // The priority of the request. Only set if Txn is nil. - Priority roachpb.UserPriority + NonTxnPriority roachpb.UserPriority // The consistency level of the request. Only set if Txn is nil. ReadConsistency roachpb.ReadConsistencyType diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 24d141aa26e0..86f817b5797c 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -55,7 +55,7 @@ import ( // // The input files use the following DSL: // -// new-txn name= ts=[,] epoch= [uncertainty-limit=[,]] +// new-txn name= ts=[,] [epoch=] [priority] [uncertainty-limit=[,]] // new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] [poison-policy=[err|wait]] // [=...] (hint: see scanSingleRequest) // sequence req= [eval-kind= p2 && (p1 == enginepb.MaxTxnPriority || p2 == enginepb.MinTxnPriority) { + if CanPushWithPriority(req.PusherTxn.Priority, req.PusheeTxn.Priority) { return true } return false } +// CanPushWithPriority returns true if the given pusher can push the pushee +// based on its priority. +func CanPushWithPriority(pusher, pushee enginepb.TxnPriority) bool { + return pusher > pushee && + (pusher == enginepb.MaxTxnPriority || pushee == enginepb.MinTxnPriority) +} + // isPushed returns whether the PushTxn request has already been // fulfilled by the current transaction state. This may be true // for transactions with pushed timestamps. func isPushed(req *roachpb.PushTxnRequest, txn *roachpb.Transaction) bool { - return (txn.Status.IsFinalized() || - (req.PushType == roachpb.PUSH_TIMESTAMP && req.PushTo.LessEq(txn.WriteTimestamp))) + return txn.Status.IsFinalized() || + (req.PushType == roachpb.PUSH_TIMESTAMP && req.PushTo.LessEq(txn.WriteTimestamp)) } // TxnExpiration computes the timestamp after which the transaction will be