From 83ac099b2d4ad0c9f036458f072bb94acb8da7a2 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 24 Jun 2022 20:45:33 -0400 Subject: [PATCH 1/4] kv: fix conflict resolution for high-priority, non-txn'al requests Fixes #83342. This commit reworks the behavior of non-transactional requests in lock wait-queues when the lock holder or the lock queue waiter has an extreme priority (min or max priority). In such cases, we allow the lock queue waiter to immediately push the lock holder out of its way, either by moving its timestamp to resolve a read-write conflict or aborting it to resolve a write-write conflict. This handling was broken in two ways for non-transactional requests. 1. these requests' priorities were not consulted when deciding whether to immediately push instead of temporarily delaying while waiting in the lock wait-queue. This meant that a high-priority, non-txn request might still wait for 50ms (kv.lock_table.coordinator_liveness_push_delay) before pushing a lower priority lock holder out of its way. 2. worse, it was possible that if these requests were not in the front of a lock wait-queue, they might never push. This was because we had logic that disabled a push if it was not needed for the purposes of checking liveness, detecting deadlocks, or enforcing timeouts. This commit resolves both of these issues. It also improves the testing of transaction priorities in the `kv/kvserver/concurrency` package. Finally, it consolidates the determination of when a pusher should be able to push/abort a pushee into a single location. Release note (bug fix): a bug in transaction conflict resolution which could allow backups to wait on long-running transactions has been resolved. --- pkg/kv/kvserver/batcheval/cmd_push_txn.go | 2 +- pkg/kv/kvserver/batcheval/transaction.go | 7 - .../concurrency/concurrency_control.go | 2 +- .../concurrency/concurrency_manager_test.go | 63 +- .../concurrency/datadriven_util_test.go | 35 ++ .../kvserver/concurrency/lock_table_waiter.go | 36 +- .../concurrency/lock_table_waiter_test.go | 4 +- .../testdata/concurrency_manager/priority | 575 ++++++++++++++++++ pkg/kv/kvserver/replica_send.go | 2 +- pkg/kv/kvserver/txnwait/queue.go | 14 +- 10 files changed, 695 insertions(+), 45 deletions(-) create mode 100644 pkg/kv/kvserver/concurrency/testdata/concurrency_manager/priority diff --git a/pkg/kv/kvserver/batcheval/cmd_push_txn.go b/pkg/kv/kvserver/batcheval/cmd_push_txn.go index 8134aa90d086..cc99f4c72281 100644 --- a/pkg/kv/kvserver/batcheval/cmd_push_txn.go +++ b/pkg/kv/kvserver/batcheval/cmd_push_txn.go @@ -255,7 +255,7 @@ func PushTxn( // If just attempting to cleanup old or already-committed txns, // pusher always fails. pusherWins = false - case CanPushWithPriority(&args.PusherTxn, &reply.PusheeTxn): + case txnwait.CanPushWithPriority(args.PusherTxn.Priority, reply.PusheeTxn.Priority): reason = "pusher has priority" pusherWins = true case args.Force: diff --git a/pkg/kv/kvserver/batcheval/transaction.go b/pkg/kv/kvserver/batcheval/transaction.go index c803ed1a3dd9..bd374efda956 100644 --- a/pkg/kv/kvserver/batcheval/transaction.go +++ b/pkg/kv/kvserver/batcheval/transaction.go @@ -118,13 +118,6 @@ func UpdateAbortSpan( return rec.AbortSpan().Put(ctx, readWriter, ms, txn.ID, &curEntry) } -// CanPushWithPriority returns true if the given pusher can push the pushee -// based on its priority. -func CanPushWithPriority(pusher, pushee *roachpb.Transaction) bool { - return (pusher.Priority > enginepb.MinTxnPriority && pushee.Priority == enginepb.MinTxnPriority) || - (pusher.Priority == enginepb.MaxTxnPriority && pushee.Priority < pusher.Priority) -} - // CanCreateTxnRecord determines whether a transaction record can be created for // the provided transaction. If not, the function will return an error. If so, // the function may modify the provided transaction. diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index a3fe9054a10a..ee640416c7a3 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -380,7 +380,7 @@ type Request struct { Timestamp hlc.Timestamp // The priority of the request. Only set if Txn is nil. - Priority roachpb.UserPriority + NonTxnPriority roachpb.UserPriority // The consistency level of the request. Only set if Txn is nil. ReadConsistency roachpb.ReadConsistencyType diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 24d141aa26e0..86f817b5797c 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -55,7 +55,7 @@ import ( // // The input files use the following DSL: // -// new-txn name= ts=[,] epoch= [uncertainty-limit=[,]] +// new-txn name= ts=[,] [epoch=] [priority] [uncertainty-limit=[,]] // new-request name= txn=|none ts=[,] [priority] [inconsistent] [wait-policy=] [lock-timeout] [max-lock-wait-queue-length=] [poison-policy=[err|wait]] // [=...] (hint: see scanSingleRequest) // sequence req= [eval-kind= p2 && (p1 == enginepb.MaxTxnPriority || p2 == enginepb.MinTxnPriority) { + if CanPushWithPriority(req.PusherTxn.Priority, req.PusheeTxn.Priority) { return true } return false } +// CanPushWithPriority returns true if the given pusher can push the pushee +// based on its priority. +func CanPushWithPriority(pusher, pushee enginepb.TxnPriority) bool { + return pusher > pushee && + (pusher == enginepb.MaxTxnPriority || pushee == enginepb.MinTxnPriority) +} + // isPushed returns whether the PushTxn request has already been // fulfilled by the current transaction state. This may be true // for transactions with pushed timestamps. func isPushed(req *roachpb.PushTxnRequest, txn *roachpb.Transaction) bool { - return (txn.Status.IsFinalized() || - (req.PushType == roachpb.PUSH_TIMESTAMP && req.PushTo.LessEq(txn.WriteTimestamp))) + return txn.Status.IsFinalized() || + (req.PushType == roachpb.PUSH_TIMESTAMP && req.PushTo.LessEq(txn.WriteTimestamp)) } // TxnExpiration computes the timestamp after which the transaction will be From f6a7a2ccfe8dbbda93d92051e3d1616376d7194c Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 5 Jul 2022 15:22:49 -0400 Subject: [PATCH 2/4] kv: add txn push reason(s) to trace events This commit adds the reason(s) for pushing a lock holder while in a lock wait-queue to trace events. --- .../kvserver/concurrency/lock_table_waiter.go | 6 +++++ .../testdata/concurrency_manager/basic | 4 ++++ .../clear_abandoned_intents | 7 ++++++ ...doned_intents_without_adding_to_lock_table | 1 + .../testdata/concurrency_manager/deadlocks | 22 +++++++++++++++++++ .../discover_lock_after_lease_race | 2 ++ .../concurrency_manager/discovered_lock | 1 + .../testdata/concurrency_manager/lock_timeout | 5 +++++ .../testdata/concurrency_manager/optimistic | 1 + .../testdata/concurrency_manager/priority | 15 +++++++++++++ .../concurrency_manager/queue_length_exceeded | 9 ++++++++ .../concurrency_manager/range_state_listener | 9 ++++++++ .../testdata/concurrency_manager/uncertainty | 5 +++++ .../testdata/concurrency_manager/update | 4 ++++ .../concurrency_manager/wait_elsewhere | 1 + .../concurrency_manager/wait_policy_error | 2 ++ .../testdata/concurrency_manager/wait_self | 5 +++++ 17 files changed, 99 insertions(+) diff --git a/pkg/kv/kvserver/concurrency/lock_table_waiter.go b/pkg/kv/kvserver/concurrency/lock_table_waiter.go index e72897bd559c..2c4ab296e9d9 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_waiter.go +++ b/pkg/kv/kvserver/concurrency/lock_table_waiter.go @@ -230,6 +230,7 @@ func (w *lockTableWaiterImpl) WaitOn( // If the request doesn't want to perform a delayed push for any // reason, continue waiting without a timer. if !(livenessPush || deadlockPush || timeoutPush || priorityPush) { + log.Eventf(ctx, "not pushing") continue } @@ -260,6 +261,11 @@ func (w *lockTableWaiterImpl) WaitOn( delay = 0 } + log.Eventf(ctx, "pushing after %s for: "+ + "liveness detection = %t, deadlock detection = %t, "+ + "timeout enforcement = %t, priority enforcement = %t", + delay, livenessPush, deadlockPush, timeoutPush, priorityPush) + if delay > 0 { if timer == nil { timer = timeutil.NewTimer() diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic index 88386f5b2af6..8f0cb2071046 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/basic @@ -117,6 +117,7 @@ sequence req=req3 [2] sequence req3: scanning lock table for conflicting locks [2] sequence req3: waiting in lock wait-queues [2] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[2] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req3: pushing timestamp of txn 00000002 above 14.000000000,1 [2] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -204,6 +205,7 @@ sequence req=req5 [2] sequence req5: scanning lock table for conflicting locks [2] sequence req5: waiting in lock wait-queues [2] sequence req5: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[2] sequence req5: pushing after 0s for: liveness detection = true, deadlock detection = false, timeout enforcement = false, priority enforcement = false [2] sequence req5: pushing timestamp of txn 00000002 above 14.000000000,1 [2] sequence req5: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -218,6 +220,7 @@ sequence req=req6 [3] sequence req6: scanning lock table for conflicting locks [3] sequence req6: waiting in lock wait-queues [3] sequence req6: lock wait-queue event: wait for txn 00000002 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 2) +[3] sequence req6: not pushing [3] sequence req6: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn debug-advance-clock ts=123 @@ -261,6 +264,7 @@ finish req=req6 [4] sequence req7: scanning lock table for conflicting locks [4] sequence req7: waiting in lock wait-queues [4] sequence req7: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req7: pushing after 0s for: liveness detection = true, deadlock detection = false, timeout enforcement = false, priority enforcement = false [4] sequence req7: pushing txn 00000002 to abort [4] sequence req7: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents index 66a9a1f24711..bd57ad431e84 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents @@ -65,6 +65,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "a" (queuedWriters: 0, queuedReaders: 1) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing timestamp of txn 00000002 above 10.000000000,1 [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -178,6 +179,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "a" (queuedWriters: 1, queuedReaders: 0) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing txn 00000002 to abort [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -354,6 +356,7 @@ sequence req=req1 [4] sequence req1: scanning lock table for conflicting locks [4] sequence req1: waiting in lock wait-queues [4] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "a" (queuedWriters: 0, queuedReaders: 1) +[4] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req1: pushing timestamp of txn 00000002 above 10.000000000,1 [4] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -471,6 +474,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "c" (queuedWriters: 1, queuedReaders: 0) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing txn 00000003 to abort [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -564,6 +568,7 @@ sequence req=req2 [6] sequence req2: scanning lock table for conflicting locks [6] sequence req2: waiting in lock wait-queues [6] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "a" (queuedWriters: 0, queuedReaders: 1) +[6] sequence req2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req2: pushing timestamp of txn 00000003 above 11.000000000,1 [6] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -603,11 +608,13 @@ on-txn-updated txn=txn3 status=aborted [3] sequence req1: resolving intent "c" for txn 00000003 with ABORTED status [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000005 holding lock @ key "e" (queuedWriters: 1, queuedReaders: 0) [3] sequence req1: conflicted with 00000003-0000-0000-0000-000000000000 on "c" for 123.000s +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing txn 00000005 to abort [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req2: resolving intent "a" for txn 00000003 with ABORTED status [6] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000004 holding lock @ key "b" (queuedWriters: 0, queuedReaders: 1) [6] sequence req2: conflicted with 00000003-0000-0000-0000-000000000000 on "a" for 123.000s +[6] sequence req2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req2: pushing timestamp of txn 00000004 above 11.000000000,1 [6] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents_without_adding_to_lock_table b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents_without_adding_to_lock_table index 68f89abf6c4d..e04f178c340f 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents_without_adding_to_lock_table +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/clear_abandoned_intents_without_adding_to_lock_table @@ -44,6 +44,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "a" (queuedWriters: 0, queuedReaders: 1) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing timestamp of txn 00000002 above 10.000000000,1 [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks index 7e146f3e5bc6..3c0a649d12ac 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/deadlocks @@ -113,6 +113,7 @@ sequence req=req1r [4] sequence req1r: scanning lock table for conflicting locks [4] sequence req1r: waiting in lock wait-queues [4] sequence req1r: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "b" (queuedWriters: 0, queuedReaders: 1) +[4] sequence req1r: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req1r: pushing timestamp of txn 00000002 above 10.000000000,1 [4] sequence req1r: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -123,6 +124,7 @@ sequence req=req2r [5] sequence req2r: scanning lock table for conflicting locks [5] sequence req2r: waiting in lock wait-queues [5] sequence req2r: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "c" (queuedWriters: 0, queuedReaders: 1) +[5] sequence req2r: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req2r: pushing timestamp of txn 00000003 above 10.000000000,1 [5] sequence req2r: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -135,6 +137,7 @@ sequence req=req3r [6] sequence req3r: scanning lock table for conflicting locks [6] sequence req3r: waiting in lock wait-queues [6] sequence req3r: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "a" (queuedWriters: 0, queuedReaders: 1) +[6] sequence req3r: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req3r: pushing timestamp of txn 00000001 above 10.000000000,1 [6] sequence req3r: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3r: dependency cycle detected 00000003->00000001->00000002->00000003 @@ -326,6 +329,7 @@ sequence req=req4w [4] sequence req4w: scanning lock table for conflicting locks [4] sequence req4w: waiting in lock wait-queues [4] sequence req4w: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "a" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req4w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4w: pushing txn 00000001 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -336,6 +340,7 @@ sequence req=req1w2 [5] sequence req1w2: scanning lock table for conflicting locks [5] sequence req1w2: waiting in lock wait-queues [5] sequence req1w2: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "b" (queuedWriters: 1, queuedReaders: 0) +[5] sequence req1w2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req1w2: pushing txn 00000002 to abort [5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -346,6 +351,7 @@ sequence req=req2w2 [6] sequence req2w2: scanning lock table for conflicting locks [6] sequence req2w2: waiting in lock wait-queues [6] sequence req2w2: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "c" (queuedWriters: 1, queuedReaders: 0) +[6] sequence req2w2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req2w2: pushing txn 00000003 to abort [6] sequence req2w2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -358,6 +364,7 @@ sequence req=req3w2 [7] sequence req3w2: scanning lock table for conflicting locks [7] sequence req3w2: waiting in lock wait-queues [7] sequence req3w2: lock wait-queue event: wait for txn 00000001 holding lock @ key "a" (queuedWriters: 2, queuedReaders: 0) +[7] sequence req3w2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [7] sequence req3w2: pushing txn 00000001 to abort [7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [7] sequence req3w2: dependency cycle detected 00000003->00000001->00000002->00000003 @@ -399,6 +406,7 @@ on-txn-updated txn=txn1 status=aborted [7] sequence req3w2: resolving intent "a" for txn 00000001 with ABORTED status [7] sequence req3w2: lock wait-queue event: wait for (distinguished) txn 00000004 running request @ key "a" (queuedWriters: 1, queuedReaders: 0) [7] sequence req3w2: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 0.000s +[7] sequence req3w2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [7] sequence req3w2: pushing txn 00000004 to detect request deadlock [7] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -552,6 +560,7 @@ sequence req=req4w [4] sequence req4w: scanning lock table for conflicting locks [4] sequence req4w: waiting in lock wait-queues [4] sequence req4w: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "b" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req4w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4w: pushing txn 00000002 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -561,6 +570,7 @@ on-txn-updated txn=txn2 status=committed [4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status [4] sequence req4w: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "c" (queuedWriters: 1, queuedReaders: 0) [4] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 0.000s +[4] sequence req4w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4w: pushing txn 00000003 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -597,6 +607,7 @@ sequence req=req1w2 [5] sequence req1w2: scanning lock table for conflicting locks [5] sequence req1w2: waiting in lock wait-queues [5] sequence req1w2: lock wait-queue event: wait for (distinguished) txn 00000004 running request @ key "b" (queuedWriters: 1, queuedReaders: 0) +[5] sequence req1w2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req1w2: pushing txn 00000004 to detect request deadlock [5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -609,6 +620,7 @@ sequence req=req3w2 [6] sequence req3w2: scanning lock table for conflicting locks [6] sequence req3w2: waiting in lock wait-queues [6] sequence req3w2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "a" (queuedWriters: 1, queuedReaders: 0) +[6] sequence req3w2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req3w2: pushing txn 00000001 to abort [6] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3w2: dependency cycle detected 00000003->00000001->00000004->00000003 @@ -786,6 +798,7 @@ sequence req=req4w [4] sequence req4w: scanning lock table for conflicting locks [4] sequence req4w: waiting in lock wait-queues [4] sequence req4w: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "b" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req4w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4w: pushing txn 00000002 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -795,6 +808,7 @@ on-txn-updated txn=txn2 status=committed [4] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status [4] sequence req4w: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "c" (queuedWriters: 1, queuedReaders: 0) [4] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 0.000s +[4] sequence req4w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4w: pushing txn 00000003 to abort [4] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -831,6 +845,7 @@ sequence req=req1w2 [5] sequence req1w2: scanning lock table for conflicting locks [5] sequence req1w2: waiting in lock wait-queues [5] sequence req1w2: lock wait-queue event: wait for (distinguished) txn 00000004 running request @ key "b" (queuedWriters: 1, queuedReaders: 0) +[5] sequence req1w2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req1w2: pushing txn 00000004 to detect request deadlock [5] sequence req1w2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -843,6 +858,7 @@ sequence req=req3w2 [6] sequence req3w2: scanning lock table for conflicting locks [6] sequence req3w2: waiting in lock wait-queues [6] sequence req3w2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "a" (queuedWriters: 1, queuedReaders: 0) +[6] sequence req3w2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req3w2: pushing txn 00000001 to abort [6] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3w2: dependency cycle detected 00000003->00000001->00000004->00000003 @@ -1032,6 +1048,7 @@ sequence req=req5w [4] sequence req5w: scanning lock table for conflicting locks [4] sequence req5w: waiting in lock wait-queues [4] sequence req5w: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "b" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req5w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req5w: pushing txn 00000002 to abort [4] sequence req5w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -1042,6 +1059,7 @@ sequence req=req4w [5] sequence req4w: scanning lock table for conflicting locks [5] sequence req4w: waiting in lock wait-queues [5] sequence req4w: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "a" (queuedWriters: 1, queuedReaders: 0) +[5] sequence req4w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req4w: pushing txn 00000001 to abort [5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -1051,6 +1069,7 @@ on-txn-updated txn=txn1 status=committed [5] sequence req4w: resolving intent "a" for txn 00000001 with COMMITTED status [5] sequence req4w: lock wait-queue event: wait for txn 00000002 holding lock @ key "b" (queuedWriters: 2, queuedReaders: 0) [5] sequence req4w: conflicted with 00000001-0000-0000-0000-000000000000 on "a" for 0.000s +[5] sequence req4w: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req4w: pushing txn 00000002 to abort [5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -1060,11 +1079,13 @@ on-txn-updated txn=txn2 status=committed [4] sequence req5w: resolving intent "b" for txn 00000002 with COMMITTED status [4] sequence req5w: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "c" (queuedWriters: 1, queuedReaders: 0) [4] sequence req5w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 0.000s +[4] sequence req5w: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req5w: pushing txn 00000003 to abort [4] sequence req5w: blocked on select in concurrency_test.(*cluster).PushTransaction [5] sequence req4w: resolving intent "b" for txn 00000002 with COMMITTED status [5] sequence req4w: lock wait-queue event: wait for (distinguished) txn 00000005 running request @ key "b" (queuedWriters: 1, queuedReaders: 0) [5] sequence req4w: conflicted with 00000002-0000-0000-0000-000000000000 on "b" for 0.000s +[5] sequence req4w: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req4w: pushing txn 00000005 to detect request deadlock [5] sequence req4w: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -1102,6 +1123,7 @@ sequence req=req3w2 [6] sequence req3w2: scanning lock table for conflicting locks [6] sequence req3w2: waiting in lock wait-queues [6] sequence req3w2: lock wait-queue event: wait for (distinguished) txn 00000004 running request @ key "a" (queuedWriters: 1, queuedReaders: 0) +[6] sequence req3w2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req3w2: pushing txn 00000004 to detect request deadlock [6] sequence req3w2: blocked on select in concurrency_test.(*cluster).PushTransaction [6] sequence req3w2: dependency cycle detected 00000003->00000004->00000005->00000003 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discover_lock_after_lease_race b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discover_lock_after_lease_race index 6e4aa250c7c7..759ff68bde33 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discover_lock_after_lease_race +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discover_lock_after_lease_race @@ -150,6 +150,7 @@ sequence req=req4 [5] sequence req4: scanning lock table for conflicting locks [5] sequence req4: waiting in lock wait-queues [5] sequence req4: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[5] sequence req4: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req4: pushing timestamp of txn 00000003 above 10.000000000,0 [5] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -176,6 +177,7 @@ sequence req=req2 [7] sequence req2: scanning lock table for conflicting locks [7] sequence req2: waiting in lock wait-queues [7] sequence req2: lock wait-queue event: wait for txn 00000003 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 2) +[7] sequence req2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [7] sequence req2: pushing timestamp of txn 00000003 above 10.000000000,0 [7] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock index 0028b54224fd..b2651e3a0c85 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/discovered_lock @@ -39,6 +39,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing timestamp of txn 00000001 above 12.000000000,1 [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/lock_timeout b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/lock_timeout index 0021c3d0365f..e3aad4fe7299 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/lock_timeout +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/lock_timeout @@ -71,6 +71,7 @@ sequence req=req3 [3] sequence req3: scanning lock table for conflicting locks [3] sequence req3: waiting in lock wait-queues [3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k2" (queuedWriters: 1, queuedReaders: 0) +[3] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req3: pushing txn 00000001 to abort [3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -104,6 +105,7 @@ sequence req=reqTimeout1 [4] sequence reqTimeout1: scanning lock table for conflicting locks [4] sequence reqTimeout1: waiting in lock wait-queues [4] sequence reqTimeout1: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[4] sequence reqTimeout1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = true, priority enforcement = false [4] sequence reqTimeout1: pushing txn 00000001 to check if abandoned [4] sequence reqTimeout1: pushee not abandoned [4] sequence reqTimeout1: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s @@ -120,6 +122,7 @@ on-txn-updated txn=txn1 status=committed [3] sequence req3: resolving intent "k2" for txn 00000001 with COMMITTED status [3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k3" (queuedWriters: 1, queuedReaders: 0) [3] sequence req3: conflicted with 00000001-0000-0000-0000-000000000000 on "k2" for 0.000s +[3] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req3: pushing txn 00000002 to abort [3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -163,6 +166,7 @@ sequence req=reqTimeout2 [6] sequence reqTimeout2: scanning lock table for conflicting locks [6] sequence reqTimeout2: waiting in lock wait-queues [6] sequence reqTimeout2: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k2" (queuedWriters: 1, queuedReaders: 0) +[6] sequence reqTimeout2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = true, priority enforcement = false [6] sequence reqTimeout2: conflicted with 00000003-0000-0000-0000-000000000000 on "k2" for 0.000s [6] sequence reqTimeout2: sequencing complete, returned error: conflicting intents on "k2" [reason=lock_timeout] @@ -194,6 +198,7 @@ sequence req=reqTimeout3 [9] sequence reqTimeout3: scanning lock table for conflicting locks [9] sequence reqTimeout3: waiting in lock wait-queues [9] sequence reqTimeout3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k4" (queuedWriters: 0, queuedReaders: 1) +[9] sequence reqTimeout3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = true, priority enforcement = false [9] sequence reqTimeout3: pushing txn 00000002 to check if abandoned [9] sequence reqTimeout3: pushee not abandoned [9] sequence reqTimeout3: conflicted with 00000002-0000-0000-0000-000000000000 on "k4" for 0.000s diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic index 93f49f153e8f..f9e399c75752 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/optimistic @@ -93,6 +93,7 @@ sequence req=req3 eval-kind=pess-after-opt [4] sequence req3: scanning lock table for conflicting locks [4] sequence req3: waiting in lock wait-queues [4] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "d" (queuedWriters: 0, queuedReaders: 1) +[4] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req3: pushing timestamp of txn 00000001 above 12.000000000,1 [4] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/priority b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/priority index d15288317477..f9da4e710abb 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/priority +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/priority @@ -139,6 +139,7 @@ sequence req=req4 [4] sequence req4: scanning lock table for conflicting locks [4] sequence req4: waiting in lock wait-queues [4] sequence req4: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "kLow1" (queuedWriters: 0, queuedReaders: 1) +[4] sequence req4: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4: pushing timestamp of txn 00000001 above 10.000000000,1 [4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -155,6 +156,7 @@ sequence req=req5 [5] sequence req5: scanning lock table for conflicting locks [5] sequence req5: waiting in lock wait-queues [5] sequence req5: lock wait-queue event: wait for txn 00000001 holding lock @ key "kLow1" (queuedWriters: 0, queuedReaders: 2) +[5] sequence req5: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = true [5] sequence req5: pushing timestamp of txn 00000001 above 10.000000000,1 [5] sequence req5: pusher pushed pushee to 10.000000000,2 [5] sequence req5: resolving intent "kLow1" for txn 00000001 with PENDING status and clock observation {1 123.000000000,3} @@ -210,6 +212,7 @@ sequence req=req6 [6] sequence req6: scanning lock table for conflicting locks [6] sequence req6: waiting in lock wait-queues [6] sequence req6: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "kLow2" (queuedWriters: 1, queuedReaders: 0) +[6] sequence req6: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [6] sequence req6: pushing txn 00000001 to abort [6] sequence req6: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -226,11 +229,13 @@ sequence req=req7 [7] sequence req7: scanning lock table for conflicting locks [7] sequence req7: waiting in lock wait-queues [7] sequence req7: lock wait-queue event: wait for txn 00000001 holding lock @ key "kLow2" (queuedWriters: 2, queuedReaders: 0) +[7] sequence req7: pushing after 0s for: liveness detection = false, deadlock detection = false, timeout enforcement = false, priority enforcement = true [7] sequence req7: pushing txn 00000001 to abort [7] sequence req7: pusher aborted pushee [7] sequence req7: resolving intent "kLow2" for txn 00000001 with ABORTED status [7] sequence req7: lock wait-queue event: wait for (distinguished) txn 00000004 running request @ key "kLow2" (queuedWriters: 1, queuedReaders: 0) [7] sequence req7: conflicted with 00000001-0000-0000-0000-000000000000 on "kLow2" for 0.000s +[7] sequence req7: pushing after 0s for: liveness detection = false, deadlock detection = false, timeout enforcement = false, priority enforcement = true [7] sequence req7: pushing txn 00000004 to detect request deadlock [7] sequence req7: pusher aborted pushee [7] sequence req7: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn @@ -284,6 +289,7 @@ sequence req=req8 [8] sequence req8: scanning lock table for conflicting locks [8] sequence req8: waiting in lock wait-queues [8] sequence req8: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "kNormal1" (queuedWriters: 0, queuedReaders: 1) +[8] sequence req8: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [8] sequence req8: pushing timestamp of txn 00000002 above 10.000000000,1 [8] sequence req8: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -300,6 +306,7 @@ sequence req=req9 [9] sequence req9: scanning lock table for conflicting locks [9] sequence req9: waiting in lock wait-queues [9] sequence req9: lock wait-queue event: wait for txn 00000002 holding lock @ key "kNormal1" (queuedWriters: 0, queuedReaders: 2) +[9] sequence req9: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = true [9] sequence req9: pushing timestamp of txn 00000002 above 10.000000000,1 [9] sequence req9: pusher pushed pushee to 10.000000000,2 [9] sequence req9: resolving intent "kNormal1" for txn 00000002 with PENDING status and clock observation {1 123.000000000,8} @@ -353,6 +360,7 @@ sequence req=req10 [10] sequence req10: scanning lock table for conflicting locks [10] sequence req10: waiting in lock wait-queues [10] sequence req10: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "kNormal2" (queuedWriters: 1, queuedReaders: 0) +[10] sequence req10: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [10] sequence req10: pushing txn 00000002 to abort [10] sequence req10: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -369,11 +377,13 @@ sequence req=req11 [11] sequence req11: scanning lock table for conflicting locks [11] sequence req11: waiting in lock wait-queues [11] sequence req11: lock wait-queue event: wait for txn 00000002 holding lock @ key "kNormal2" (queuedWriters: 2, queuedReaders: 0) +[11] sequence req11: pushing after 0s for: liveness detection = false, deadlock detection = false, timeout enforcement = false, priority enforcement = true [11] sequence req11: pushing txn 00000002 to abort [11] sequence req11: pusher aborted pushee [11] sequence req11: resolving intent "kNormal2" for txn 00000002 with ABORTED status [11] sequence req11: lock wait-queue event: wait for (distinguished) txn 00000007 running request @ key "kNormal2" (queuedWriters: 1, queuedReaders: 0) [11] sequence req11: conflicted with 00000002-0000-0000-0000-000000000000 on "kNormal2" for 0.000s +[11] sequence req11: pushing after 0s for: liveness detection = false, deadlock detection = false, timeout enforcement = false, priority enforcement = true [11] sequence req11: pushing txn 00000007 to detect request deadlock [11] sequence req11: pusher aborted pushee [11] sequence req11: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn @@ -425,6 +435,7 @@ sequence req=req12 [12] sequence req12: scanning lock table for conflicting locks [12] sequence req12: waiting in lock wait-queues [12] sequence req12: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "kHigh1" (queuedWriters: 0, queuedReaders: 1) +[12] sequence req12: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [12] sequence req12: pushing timestamp of txn 00000003 above 10.000000000,1 [12] sequence req12: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -435,6 +446,7 @@ sequence req=req13 [13] sequence req13: scanning lock table for conflicting locks [13] sequence req13: waiting in lock wait-queues [13] sequence req13: lock wait-queue event: wait for txn 00000003 holding lock @ key "kHigh1" (queuedWriters: 0, queuedReaders: 2) +[13] sequence req13: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [13] sequence req13: pushing timestamp of txn 00000003 above 10.000000000,1 [13] sequence req13: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -496,6 +508,7 @@ sequence req=req14 [14] sequence req14: scanning lock table for conflicting locks [14] sequence req14: waiting in lock wait-queues [14] sequence req14: lock wait-queue event: wait for (distinguished) txn 00000003 holding lock @ key "kHigh2" (queuedWriters: 1, queuedReaders: 0) +[14] sequence req14: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [14] sequence req14: pushing txn 00000003 to abort [14] sequence req14: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -506,6 +519,7 @@ sequence req=req15 [15] sequence req15: scanning lock table for conflicting locks [15] sequence req15: waiting in lock wait-queues [15] sequence req15: lock wait-queue event: wait for txn 00000003 holding lock @ key "kHigh2" (queuedWriters: 2, queuedReaders: 0) +[15] sequence req15: not pushing [15] sequence req15: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn on-txn-updated txn=txnHighPushee status=committed @@ -519,6 +533,7 @@ on-txn-updated txn=txnHighPushee status=committed [14] sequence req14: sequencing complete, returned guard [15] sequence req15: lock wait-queue event: wait for (distinguished) txn 00000008 running request @ key "kHigh2" (queuedWriters: 1, queuedReaders: 0) [15] sequence req15: conflicted with 00000003-0000-0000-0000-000000000000 on "kHigh2" for 0.000s +[15] sequence req15: pushing after 0s for: liveness detection = false, deadlock detection = false, timeout enforcement = false, priority enforcement = true [15] sequence req15: pushing txn 00000008 to detect request deadlock [15] sequence req15: pusher aborted pushee [15] sequence req15: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/queue_length_exceeded b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/queue_length_exceeded index f447e77b65f6..c2327627d355 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/queue_length_exceeded +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/queue_length_exceeded @@ -51,6 +51,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[2] sequence req2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: pushing txn 00000001 to abort [2] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -65,6 +66,7 @@ sequence req=req3 [3] sequence req3: scanning lock table for conflicting locks [3] sequence req3: waiting in lock wait-queues [3] sequence req3: lock wait-queue event: wait for txn 00000001 holding lock @ key "k" (queuedWriters: 2, queuedReaders: 0) +[3] sequence req3: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req3: pushing txn 00000001 to abort [3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -79,6 +81,7 @@ sequence req=req4 [4] sequence req4: scanning lock table for conflicting locks [4] sequence req4: waiting in lock wait-queues [4] sequence req4: lock wait-queue event: wait for txn 00000001 holding lock @ key "k" (queuedWriters: 3, queuedReaders: 0) +[4] sequence req4: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4: pushing txn 00000001 to abort [4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -111,6 +114,7 @@ sequence req=req5r [5] sequence req5r: scanning lock table for conflicting locks [5] sequence req5r: waiting in lock wait-queues [5] sequence req5r: lock wait-queue event: wait for txn 00000001 holding lock @ key "k" (queuedWriters: 3, queuedReaders: 1) +[5] sequence req5r: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req5r: pushing timestamp of txn 00000001 above 10.000000000,1 [5] sequence req5r: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -126,11 +130,13 @@ on-txn-updated txn=txn1 status=committed [3] sequence req3: resolving intent "k" for txn 00000001 with COMMITTED status [3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 running request @ key "k" (queuedWriters: 2, queuedReaders: 0) [3] sequence req3: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s +[3] sequence req3: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req3: pushing txn 00000002 to detect request deadlock [3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction [4] sequence req4: resolving intent "k" for txn 00000001 with COMMITTED status [4] sequence req4: lock wait-queue event: wait for txn 00000002 running request @ key "k" (queuedWriters: 2, queuedReaders: 0) [4] sequence req4: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 0.000s +[4] sequence req4: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4: pushing txn 00000002 to detect request deadlock [4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction [5] sequence req5r: resolving intent "k" for txn 00000001 with COMMITTED status @@ -148,9 +154,11 @@ on-lock-acquired req=req2 key=k ---- [-] acquire lock: txn 00000002 @ k [3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 2, queuedReaders: 0) +[3] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req3: pushing txn 00000002 to abort [3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction [4] sequence req4: lock wait-queue event: wait for txn 00000002 holding lock @ key "k" (queuedWriters: 2, queuedReaders: 0) +[4] sequence req4: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4: pushing txn 00000002 to abort [4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -203,6 +211,7 @@ on-txn-updated txn=txn2 status=aborted [4] sequence req4: resolving intent "k" for txn 00000002 with ABORTED status [4] sequence req4: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0) [4] sequence req4: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 0.000s +[4] sequence req4: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req4: pushing txn 00000003 to detect request deadlock [4] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener index a19b8332d311..96b5c94f8fd5 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/range_state_listener @@ -95,6 +95,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[2] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn debug-lock-table @@ -179,6 +180,7 @@ sequence req=req2 [7] sequence req2: scanning lock table for conflicting locks [7] sequence req2: waiting in lock wait-queues [7] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[7] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [7] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn new-request name=reqRes1 txn=none ts=10,1 @@ -297,6 +299,7 @@ sequence req=req3 [13] sequence req3: scanning lock table for conflicting locks [13] sequence req3: waiting in lock wait-queues [13] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[13] sequence req3: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [13] sequence req3: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn new-request name=reqRes2 txn=none ts=10,1 @@ -411,6 +414,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[2] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn debug-lock-table @@ -458,6 +462,7 @@ sequence req=req2 [4] sequence req2: scanning lock table for conflicting locks [4] sequence req2: waiting in lock wait-queues [4] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn new-request name=reqRes1 txn=none ts=10,1 @@ -586,6 +591,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[2] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn sequence req=req3 @@ -682,6 +688,7 @@ sequence req=req2 [8] sequence req2: scanning lock table for conflicting locks [8] sequence req2: waiting in lock wait-queues [8] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[8] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [8] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn new-request name=reqRes1 txn=none ts=10,1 @@ -796,6 +803,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[2] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn debug-lock-table @@ -843,6 +851,7 @@ sequence req=req2 [4] sequence req2: scanning lock table for conflicting locks [4] sequence req2: waiting in lock wait-queues [4] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[4] sequence req2: pushing after 1h0m0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence req2: blocked on select in concurrency.(*lockTableWaiterImpl).WaitOn new-request name=reqRes1 txn=none ts=10,1 diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty index 500df03eaf04..482c4c1e53f9 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/uncertainty @@ -41,6 +41,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing timestamp of txn 00000001 above 15.000000000,1 [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -107,6 +108,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing timestamp of txn 00000001 above 135.000000000,0 [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -176,6 +178,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing timestamp of txn 00000001 above 150.000000000,1? [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -251,6 +254,7 @@ sequence req=req1 [3] sequence req1: scanning lock table for conflicting locks [3] sequence req1: waiting in lock wait-queues [3] sequence req1: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[3] sequence req1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req1: pushing timestamp of txn 00000001 above 15.000000000,1 [3] sequence req1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -289,6 +293,7 @@ sequence req=req2-retry [5] sequence req2-retry: scanning lock table for conflicting locks [5] sequence req2-retry: waiting in lock wait-queues [5] sequence req2-retry: lock wait-queue event: wait for txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 2) +[5] sequence req2-retry: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [5] sequence req2-retry: pushing timestamp of txn 00000001 above 15.000000000,1 [5] sequence req2-retry: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/update b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/update index 2b717ae517c6..be8a1e4816f7 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/update +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/update @@ -50,6 +50,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[2] sequence req2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: pushing timestamp of txn 00000001 above 12.000000000,1 [2] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -171,6 +172,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[2] sequence req2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: pushing timestamp of txn 00000001 above 12.000000000,1 [2] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -301,6 +303,7 @@ sequence req=req2 [2] sequence req2: scanning lock table for conflicting locks [2] sequence req2: waiting in lock wait-queues [2] sequence req2: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 0, queuedReaders: 1) +[2] sequence req2: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence req2: pushing timestamp of txn 00000001 above 12.000000000,1 [2] sequence req2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -359,6 +362,7 @@ sequence req=req4 [3] sequence req4: scanning lock table for conflicting locks [3] sequence req4: waiting in lock wait-queues [3] sequence req4: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[3] sequence req4: pushing after 0s for: liveness detection = true, deadlock detection = false, timeout enforcement = false, priority enforcement = false [3] sequence req4: pushing txn 00000001 to abort [3] sequence req4: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_elsewhere b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_elsewhere index 3e458d8d211c..159997ac386e 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_elsewhere +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_elsewhere @@ -63,6 +63,7 @@ sequence req=reqWaiter [4] sequence reqWaiter: scanning lock table for conflicting locks [4] sequence reqWaiter: waiting in lock wait-queues [4] sequence reqWaiter: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[4] sequence reqWaiter: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence reqWaiter: pushing txn 00000001 to abort [4] sequence reqWaiter: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_policy_error b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_policy_error index 99afa5ab6995..5dd811f7ca23 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_policy_error +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_policy_error @@ -71,6 +71,7 @@ sequence req=req3 [3] sequence req3: scanning lock table for conflicting locks [3] sequence req3: waiting in lock wait-queues [3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000001 holding lock @ key "k2" (queuedWriters: 1, queuedReaders: 0) +[3] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req3: pushing txn 00000001 to abort [3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -123,6 +124,7 @@ on-txn-updated txn=txn1 status=committed [3] sequence req3: resolving intent "k2" for txn 00000001 with COMMITTED status [3] sequence req3: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k3" (queuedWriters: 1, queuedReaders: 0) [3] sequence req3: conflicted with 00000001-0000-0000-0000-000000000000 on "k2" for 123.000s +[3] sequence req3: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence req3: pushing txn 00000002 to abort [3] sequence req3: blocked on select in concurrency_test.(*cluster).PushTransaction diff --git a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self index 01579ad9e5a6..7a90b9125e54 100644 --- a/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self +++ b/pkg/kv/kvserver/concurrency/testdata/concurrency_manager/wait_self @@ -48,6 +48,7 @@ sequence req=reqTxn1 [2] sequence reqTxn1: scanning lock table for conflicting locks [2] sequence reqTxn1: waiting in lock wait-queues [2] sequence reqTxn1: lock wait-queue event: wait for (distinguished) txn 00000002 holding lock @ key "k" (queuedWriters: 1, queuedReaders: 0) +[2] sequence reqTxn1: pushing after 0s for: liveness detection = true, deadlock detection = true, timeout enforcement = false, priority enforcement = false [2] sequence reqTxn1: pushing txn 00000002 to abort [2] sequence reqTxn1: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -58,6 +59,7 @@ sequence req=reqTxnMiddle [3] sequence reqTxnMiddle: scanning lock table for conflicting locks [3] sequence reqTxnMiddle: waiting in lock wait-queues [3] sequence reqTxnMiddle: lock wait-queue event: wait for txn 00000002 holding lock @ key "k" (queuedWriters: 2, queuedReaders: 0) +[3] sequence reqTxnMiddle: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence reqTxnMiddle: pushing txn 00000002 to abort [3] sequence reqTxnMiddle: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -68,6 +70,7 @@ sequence req=reqTxn2 [4] sequence reqTxn2: scanning lock table for conflicting locks [4] sequence reqTxn2: waiting in lock wait-queues [4] sequence reqTxn2: lock wait-queue event: wait for txn 00000002 holding lock @ key "k" (queuedWriters: 3, queuedReaders: 0) +[4] sequence reqTxn2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence reqTxn2: pushing txn 00000002 to abort [4] sequence reqTxn2: blocked on select in concurrency_test.(*cluster).PushTransaction @@ -86,6 +89,7 @@ on-txn-updated txn=txnOld status=committed [3] sequence reqTxnMiddle: resolving intent "k" for txn 00000002 with COMMITTED status [3] sequence reqTxnMiddle: lock wait-queue event: wait for (distinguished) txn 00000001 running request @ key "k" (queuedWriters: 2, queuedReaders: 0) [3] sequence reqTxnMiddle: conflicted with 00000002-0000-0000-0000-000000000000 on "k" for 123.000s +[3] sequence reqTxnMiddle: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [3] sequence reqTxnMiddle: pushing txn 00000001 to detect request deadlock [3] sequence reqTxnMiddle: blocked on select in concurrency_test.(*cluster).PushTransaction [4] sequence reqTxn2: resolving intent "k" for txn 00000002 with COMMITTED status @@ -120,6 +124,7 @@ finish req=reqTxn1 [3] sequence reqTxnMiddle: sequencing complete, returned guard [4] sequence reqTxn2: lock wait-queue event: wait for (distinguished) txn 00000003 running request @ key "k" (queuedWriters: 1, queuedReaders: 0) [4] sequence reqTxn2: conflicted with 00000001-0000-0000-0000-000000000000 on "k" for 123.000s +[4] sequence reqTxn2: pushing after 0s for: liveness detection = false, deadlock detection = true, timeout enforcement = false, priority enforcement = false [4] sequence reqTxn2: pushing txn 00000003 to detect request deadlock [4] sequence reqTxn2: blocked on select in concurrency_test.(*cluster).PushTransaction From 96612190cc4f91f9894a666a8706b293d87e783a Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Thu, 30 Jun 2022 15:23:06 -0700 Subject: [PATCH 3/4] util/mon: pass reserved account by reference This commit makes it so that we now pass the `reserved` memory account when starting up memory monitors by reference. Previously, due to passing by value, when the monitor is stopped, the copy of the value would get used so that the actual "reserved" memory account would get out of sync with its user. This is now fixed. However, this bug doesn't really have any production impact since we use this "reserved" feature in a handful of places and these "reserved" memory accounts are not reused between different usages. Additionally, this commit renames `MakeStandaloneBudget` to `NewStandaloneBudget` (since it now returns a reference) and adds a separate "start" method when the caller doesn't want to pre-reserve anything when starting up a monitor. Release note: None --- pkg/base/config.go | 2 +- pkg/base/test_server_args.go | 2 +- pkg/ccl/backupccl/backup_test.go | 4 +- .../changefeedccl/changefeed_processors.go | 2 +- pkg/ccl/changefeedccl/changefeed_test.go | 2 +- .../kvevent/blocking_buffer_test.go | 2 +- pkg/kv/bulk/kv_buf_test.go | 4 +- .../kvstreamer/results_buffer_test.go | 2 +- pkg/kv/kvclient/kvstreamer/streamer_test.go | 2 +- .../rangefeed/db_adapter_external_test.go | 2 +- pkg/kv/kvserver/rangefeed/budget.go | 4 +- pkg/kv/kvserver/rangefeed/budget_test.go | 12 ++--- pkg/kv/kvserver/rangefeed/processor_test.go | 4 +- pkg/kv/kvserver/ts_maintenance_queue_test.go | 2 +- pkg/server/server.go | 2 +- pkg/server/server_sql.go | 6 +-- pkg/sql/catalog/descs/collection_test.go | 4 +- pkg/sql/closed_session_cache.go | 2 +- .../colexec/colexecargs/monitor_registry.go | 2 +- pkg/sql/colflow/draining_test.go | 2 +- pkg/sql/colflow/explain_vec.go | 2 +- pkg/sql/colflow/vectorized_flow_space_test.go | 8 +-- pkg/sql/conn_executor.go | 12 ++--- pkg/sql/conn_executor_internal_test.go | 4 +- pkg/sql/delete_preserving_index_test.go | 2 +- pkg/sql/distsql/server.go | 4 +- pkg/sql/distsql_running.go | 4 +- pkg/sql/execinfra/processorsbase.go | 4 +- pkg/sql/execinfra/testutils.go | 4 +- pkg/sql/indexbackfiller_test.go | 2 +- pkg/sql/internal.go | 4 +- pkg/sql/pgwire/conn.go | 6 +-- pkg/sql/pgwire/conn_test.go | 4 +- pkg/sql/pgwire/server.go | 6 +-- pkg/sql/planner.go | 2 +- pkg/sql/row/fetcher.go | 2 +- pkg/sql/row/fetcher_test.go | 2 +- .../rowcontainer/disk_row_container_test.go | 8 +-- .../rowcontainer/hash_row_container_test.go | 28 +++++----- .../numbered_row_container_test.go | 8 +-- pkg/sql/rowcontainer/row_container_test.go | 26 ++++----- pkg/sql/rowexec/joinreader.go | 4 +- pkg/sql/rowexec/joinreader_test.go | 4 +- pkg/sql/rowexec/windower.go | 2 +- .../show_create_all_tables_builtin_test.go | 2 +- pkg/sql/sem/eval/context.go | 2 +- pkg/sql/sqlstats/sslocal/sql_stats.go | 2 +- pkg/sql/stats/row_sampling_test.go | 2 +- pkg/sql/txn_fingerprint_id_cache.go | 2 +- pkg/sql/txn_state.go | 2 +- pkg/sql/txn_state_test.go | 2 +- pkg/storage/pebble_mvcc_scanner_test.go | 2 +- pkg/ts/query_test.go | 8 +-- pkg/ts/rollup_test.go | 4 +- pkg/ts/server.go | 4 +- pkg/util/mon/bytes_usage.go | 22 ++++---- pkg/util/mon/bytes_usage_test.go | 53 +++++++++++++++---- 57 files changed, 182 insertions(+), 141 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index 3c89313f86f6..8e1f09765d1e 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -615,7 +615,7 @@ func TempStorageConfigFromEnv( maxSizeBytes/10, /* noteworthy */ st, ) - monitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(maxSizeBytes)) + monitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(maxSizeBytes)) return TempStorageConfig{ InMemory: inMem, Mon: monitor, diff --git a/pkg/base/test_server_args.go b/pkg/base/test_server_args.go index ddbad37b71e7..1617abf523d4 100644 --- a/pkg/base/test_server_args.go +++ b/pkg/base/test_server_args.go @@ -219,7 +219,7 @@ func DefaultTestTempStorageConfigWithSize( maxSizeBytes/10, /* noteworthy */ st, ) - monitor.Start(context.Background(), nil /* pool */, mon.MakeStandaloneBudget(maxSizeBytes)) + monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(maxSizeBytes)) return TempStorageConfig{ InMemory: true, Mon: monitor, diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 1e31b54331c8..b04b2932af31 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -8027,7 +8027,7 @@ func TestReadBackupManifestMemoryMonitoring(t *testing.T) { require.NoError(t, err) m := mon.NewMonitor("test-monitor", mon.MemoryResource, nil, nil, 0, 0, st) - m.Start(ctx, nil, mon.MakeStandaloneBudget(128<<20)) + m.Start(ctx, nil, mon.NewStandaloneBudget(128<<20)) mem := m.MakeBoundAccount() encOpts := &jobspb.BackupEncryptionOptions{ Mode: jobspb.EncryptionMode_Passphrase, @@ -9040,7 +9040,7 @@ func TestBackupMemMonitorSSTSinkQueueSize(t *testing.T) { ) ctx := context.Background() byteLimit := 14 << 20 // 14 MiB - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(byteLimit))) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(int64(byteLimit))) defer memoryMonitor.Stop(ctx) params := base.TestClusterArgs{} knobs := base.TestingKnobs{ diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index d9af6c52b6c8..ff60fddf9886 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -251,7 +251,7 @@ func (ca *changeAggregator) Start(ctx context.Context) { } limit := changefeedbase.PerChangefeedMemLimit.Get(&ca.flowCtx.Cfg.Settings.SV) kvFeedMemMon := mon.NewMonitorInheritWithLimit("kvFeed", limit, pool) - kvFeedMemMon.Start(ctx, pool, mon.BoundAccount{}) + kvFeedMemMon.StartNoReserved(ctx, pool) ca.kvFeedMemMon = kvFeedMemMon // The job registry has a set of metrics used to monitor the various jobs it diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index f3f86a261de8..4d1e15dae875 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -6443,7 +6443,7 @@ func startMonitorWithBudget(budget int64) *mon.BytesMonitor { nil, nil, 128 /* small allocation increment */, 100, cluster.MakeTestingClusterSettings()) - mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget)) + mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget)) return mm } diff --git a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go index 55a691f197d3..420afdd51893 100644 --- a/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go +++ b/pkg/ccl/changefeedccl/kvevent/blocking_buffer_test.go @@ -57,7 +57,7 @@ func getBoundAccountWithBudget(budget int64) (account mon.BoundAccount, cleanup nil, nil, 128 /* small allocation increment */, 100, cluster.MakeTestingClusterSettings()) - mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget)) + mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget)) return mm.MakeBoundAccount(), func() { mm.Stop(context.Background()) } } diff --git a/pkg/kv/bulk/kv_buf_test.go b/pkg/kv/bulk/kv_buf_test.go index 06ac4d486a16..0eee3d3f74db 100644 --- a/pkg/kv/bulk/kv_buf_test.go +++ b/pkg/kv/bulk/kv_buf_test.go @@ -56,7 +56,9 @@ func TestKvBuf(t *testing.T) { src, totalSize := makeTestData(50000) ctx := context.Background() - none := mon.NewMonitorWithLimit("none", mon.MemoryResource, 0, nil, nil, 0, 0, nil).MakeBoundAccount() + noneMonitor := mon.NewMonitorWithLimit("none", mon.MemoryResource, 0, nil, nil, 0, 0, nil) + noneMonitor.StartNoReserved(ctx, nil /* pool */) + none := noneMonitor.MakeBoundAccount() lots := mon.NewUnlimitedMonitor(ctx, "lots", mon.MemoryResource, nil, nil, 0, nil).MakeBoundAccount() // Write everything to our buf. diff --git a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go index 63f31baee688..90652e6b22c1 100644 --- a/pkg/kv/kvclient/kvstreamer/results_buffer_test.go +++ b/pkg/kv/kvclient/kvstreamer/results_buffer_test.go @@ -55,7 +55,7 @@ func TestInOrderResultsBuffer(t *testing.T) { math.MaxInt64, /* noteworthy */ st, ) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) budget := newBudget(nil /* acc */, math.MaxInt /* limitBytes */) diff --git a/pkg/kv/kvclient/kvstreamer/streamer_test.go b/pkg/kv/kvclient/kvstreamer/streamer_test.go index 2a003fc51793..3112e52fa85d 100644 --- a/pkg/kv/kvclient/kvstreamer/streamer_test.go +++ b/pkg/kv/kvclient/kvstreamer/streamer_test.go @@ -166,7 +166,7 @@ func TestStreamerBudgetErrorInEnqueue(t *testing.T) { math.MaxInt64, /* noteworthy */ cluster.MakeTestingClusterSettings(), ) - rootMemMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(rootPoolSize)) + rootMemMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(rootPoolSize)) defer rootMemMonitor.Stop(ctx) acc := rootMemMonitor.MakeBoundAccount() diff --git a/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go index 4993844815e8..d3a7c2011a78 100644 --- a/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go +++ b/pkg/kv/kvclient/rangefeed/db_adapter_external_test.go @@ -39,7 +39,7 @@ func startMonitorWithBudget(budget int64) *mon.BytesMonitor { nil, nil, 128 /* small allocation increment */, 100, cluster.MakeTestingClusterSettings()) - mm.Start(context.Background(), nil, mon.MakeStandaloneBudget(budget)) + mm.Start(context.Background(), nil, mon.NewStandaloneBudget(budget)) return mm } diff --git a/pkg/kv/kvserver/rangefeed/budget.go b/pkg/kv/kvserver/rangefeed/budget.go index 5f4b9d915516..83336f0ba121 100644 --- a/pkg/kv/kvserver/rangefeed/budget.go +++ b/pkg/kv/kvserver/rangefeed/budget.go @@ -328,14 +328,14 @@ func NewBudgetFactory(ctx context.Context, config BudgetFactoryConfig) *BudgetFa systemRangeFeedBudget, config.rootMon) systemRangeMonitor.SetMetrics(metrics.SystemBytesCount, nil /* maxHist */) systemRangeMonitor.Start(ctx, config.rootMon, - mon.MakeStandaloneBudget(systemRangeFeedBudget)) + mon.NewStandaloneBudget(systemRangeFeedBudget)) rangeFeedPoolMonitor := mon.NewMonitorInheritWithLimit( "rangefeed-monitor", config.totalRangeReedBudget, config.rootMon) rangeFeedPoolMonitor.SetMetrics(metrics.SharedBytesCount, nil /* maxHist */) - rangeFeedPoolMonitor.Start(ctx, config.rootMon, mon.BoundAccount{}) + rangeFeedPoolMonitor.StartNoReserved(ctx, config.rootMon) return &BudgetFactory{ limit: config.provisionalFeedLimit, diff --git a/pkg/kv/kvserver/rangefeed/budget_test.go b/pkg/kv/kvserver/rangefeed/budget_test.go index ffdb650a2f83..f48761866716 100644 --- a/pkg/kv/kvserver/rangefeed/budget_test.go +++ b/pkg/kv/kvserver/rangefeed/budget_test.go @@ -28,7 +28,7 @@ func TestFeedBudget(t *testing.T) { *FeedBudget, *mon.BytesMonitor, *mon.BoundAccount, ) { m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(poolSize)) + m.Start(context.Background(), nil, mon.NewStandaloneBudget(poolSize)) b := m.MakeBoundAccount() s := cluster.MakeTestingClusterSettings() @@ -190,7 +190,7 @@ func TestBudgetFactory(t *testing.T) { s := cluster.MakeTestingClusterSettings() rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, s) - rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000)) + rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000)) bf := NewBudgetFactory(context.Background(), CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, budgetLowThresholdFn(10000), &s.SV)) @@ -214,7 +214,7 @@ func TestDisableBudget(t *testing.T) { s := cluster.MakeTestingClusterSettings() rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, s) - rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000)) + rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000)) bf := NewBudgetFactory(context.Background(), CreateBudgetFactoryConfig(rootMon, 10000, time.Second*5, func(_ int64) int64 { return 0 @@ -228,7 +228,7 @@ func TestDisableBudgetOnTheFly(t *testing.T) { s := cluster.MakeTestingClusterSettings() m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(100000)) + m.Start(context.Background(), nil, mon.NewStandaloneBudget(100000)) bf := NewBudgetFactory(context.Background(), CreateBudgetFactoryConfig( m, @@ -265,7 +265,7 @@ func TestDisableBudgetOnTheFly(t *testing.T) { func TestConfigFactory(t *testing.T) { s := cluster.MakeTestingClusterSettings() rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000)) + rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000)) // Check provisionalFeedLimit is computed. config := CreateBudgetFactoryConfig(rootMon, 100000, time.Second*5, budgetLowThresholdFn(10000), @@ -285,7 +285,7 @@ func TestConfigFactory(t *testing.T) { func TestBudgetLimits(t *testing.T) { s := cluster.MakeTestingClusterSettings() rootMon := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - rootMon.Start(context.Background(), nil, mon.MakeStandaloneBudget(10000000)) + rootMon.Start(context.Background(), nil, mon.NewStandaloneBudget(10000000)) provisionalSize := int64(10000) adjustedSize := int64(1000) diff --git a/pkg/kv/kvserver/rangefeed/processor_test.go b/pkg/kv/kvserver/rangefeed/processor_test.go index cb3291b85149..8cae2f52e28b 100644 --- a/pkg/kv/kvserver/rangefeed/processor_test.go +++ b/pkg/kv/kvserver/rangefeed/processor_test.go @@ -1080,7 +1080,7 @@ func TestBudgetReleaseOnProcessorStop(t *testing.T) { s := cluster.MakeTestingClusterSettings() m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(math.MaxInt64)) + m.Start(context.Background(), nil, mon.NewStandaloneBudget(math.MaxInt64)) //budgetEnabled := int32(1) b := m.MakeBoundAccount() fb := NewFeedBudget(&b, 0, &s.SV) @@ -1228,7 +1228,7 @@ func TestBudgetReleaseOnLastStreamError(t *testing.T) { func newTestBudget(limit int64) *FeedBudget { s := cluster.MakeTestingClusterSettings() m := mon.NewMonitor("rangefeed", mon.MemoryResource, nil, nil, 1, math.MaxInt64, nil) - m.Start(context.Background(), nil, mon.MakeStandaloneBudget(limit)) + m.Start(context.Background(), nil, mon.NewStandaloneBudget(limit)) b := m.MakeBoundAccount() fb := NewFeedBudget(&b, 0, &s.SV) return fb diff --git a/pkg/kv/kvserver/ts_maintenance_queue_test.go b/pkg/kv/kvserver/ts_maintenance_queue_test.go index bb1c2f2f5e2a..00b4beb90155 100644 --- a/pkg/kv/kvserver/ts_maintenance_queue_test.go +++ b/pkg/kv/kvserver/ts_maintenance_queue_test.go @@ -292,7 +292,7 @@ func TestTimeSeriesMaintenanceQueueServer(t *testing.T) { math.MaxInt64, /* noteworthy */ cluster.MakeTestingClusterSettings(), ) - memMon.Start(context.Background(), nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64)) + memMon.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) defer memMon.Stop(context.Background()) memContext := ts.MakeQueryMemoryContext( memMon, diff --git a/pkg/server/server.go b/pkg/server/server.go index b75c0b10efb2..735bb2737ee0 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -503,7 +503,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { }) kvMemoryMonitor := mon.NewMonitorInheritWithLimit( "kv-mem", 0 /* limit */, sqlMonitorAndMetrics.rootSQLMemoryMonitor) - kvMemoryMonitor.Start(ctx, sqlMonitorAndMetrics.rootSQLMemoryMonitor, mon.BoundAccount{}) + kvMemoryMonitor.StartNoReserved(ctx, sqlMonitorAndMetrics.rootSQLMemoryMonitor) rangeReedBudgetFactory := serverrangefeed.NewBudgetFactory( ctx, serverrangefeed.CreateBudgetFactoryConfig( diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6d13540e8a99..1babfe92f353 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -368,7 +368,7 @@ func newRootSQLMemoryMonitor(opts monitorAndMetricsOptions) monitorAndMetrics { // serves as a parent for a memory monitor that accounts for memory used in // the KV layer at the same node. rootSQLMemoryMonitor.Start( - context.Background(), nil, mon.MakeStandaloneBudget(opts.memoryPoolSize)) + context.Background(), nil, mon.NewStandaloneBudget(opts.memoryPoolSize)) return monitorAndMetrics{ rootSQLMemoryMonitor: rootSQLMemoryMonitor, rootSQLMetrics: rootSQLMetrics, @@ -496,7 +496,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { bulkMetrics := bulk.MakeBulkMetrics(cfg.HistogramWindowInterval()) cfg.registry.AddMetricStruct(bulkMetrics) bulkMemoryMonitor.SetMetrics(bulkMetrics.CurBytesCount, bulkMetrics.MaxBytesHist) - bulkMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{}) + bulkMemoryMonitor.StartNoReserved(context.Background(), rootSQLMemoryMonitor) backfillMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backfill-mon") backupMemoryMonitor := execinfra.NewMonitor(ctx, bulkMemoryMonitor, "backup-mon") @@ -504,7 +504,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { serverCacheMemoryMonitor := mon.NewMonitorInheritWithLimit( "server-cache-mon", 0 /* limit */, rootSQLMemoryMonitor, ) - serverCacheMemoryMonitor.Start(context.Background(), rootSQLMemoryMonitor, mon.BoundAccount{}) + serverCacheMemoryMonitor.StartNoReserved(context.Background(), rootSQLMemoryMonitor) // Set up the DistSQL temp engine. diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index f87538cc1688..84822cfcab9b 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -632,7 +632,7 @@ func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) { nil, nil, -1, 0, cluster.MakeTestingClusterSettings()) // Start the monitor with unlimited budget. - monitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + monitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) // Create a `Collection` with monitor hooked up. col := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory. @@ -650,7 +650,7 @@ func TestKVDescriptorsProperlyUsesMemoryMonitoring(t *testing.T) { // Restart the monitor to a smaller budget (in fact, let's be bold by setting it to be only one byte below // what has been allocated in the previous round). - monitor.Start(ctx, nil, mon.MakeStandaloneBudget(allocatedMemoryInBytes-1)) + monitor.Start(ctx, nil, mon.NewStandaloneBudget(allocatedMemoryInBytes-1)) require.Equal(t, int64(0), monitor.AllocBytes()) // Repeat the process again and assert this time memory allocation will err out. diff --git a/pkg/sql/closed_session_cache.go b/pkg/sql/closed_session_cache.go index d444a1860350..22b2bcc7d03b 100644 --- a/pkg/sql/closed_session_cache.go +++ b/pkg/sql/closed_session_cache.go @@ -81,7 +81,7 @@ func NewClosedSessionCache( c.mu.acc = monitor.MakeBoundAccount() c.mon = monitor - c.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) + c.mon.StartNoReserved(context.Background(), parentMon) return c } diff --git a/pkg/sql/colexec/colexecargs/monitor_registry.go b/pkg/sql/colexec/colexecargs/monitor_registry.go index b4d5f89dc3cf..83c523d004c1 100644 --- a/pkg/sql/colexec/colexecargs/monitor_registry.go +++ b/pkg/sql/colexec/colexecargs/monitor_registry.go @@ -89,7 +89,7 @@ func (r *MonitorRegistry) CreateMemAccountForSpillStrategyWithLimit( } monitorName := r.getMemMonitorName(opName, processorID, "limited" /* suffix */) bufferingOpMemMonitor := mon.NewMonitorInheritWithLimit(monitorName, limit, flowCtx.EvalCtx.Mon) - bufferingOpMemMonitor.Start(ctx, flowCtx.EvalCtx.Mon, mon.BoundAccount{}) + bufferingOpMemMonitor.StartNoReserved(ctx, flowCtx.EvalCtx.Mon) r.monitors = append(r.monitors, bufferingOpMemMonitor) bufferingMemAccount := bufferingOpMemMonitor.MakeBoundAccount() r.accounts = append(r.accounts, &bufferingMemAccount) diff --git a/pkg/sql/colflow/draining_test.go b/pkg/sql/colflow/draining_test.go index 11755c375ce3..76332fbde3c9 100644 --- a/pkg/sql/colflow/draining_test.go +++ b/pkg/sql/colflow/draining_test.go @@ -47,7 +47,7 @@ func TestDrainingAfterRemoteError(t *testing.T) { math.MaxInt64, cluster.MakeTestingClusterSettings(), ) - diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(1)) + diskMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(1)) // Set up a two node cluster. tempStorageConfig := base.TempStorageConfig{InMemory: true, Mon: diskMonitor} diff --git a/pkg/sql/colflow/explain_vec.go b/pkg/sql/colflow/explain_vec.go index ad2b128da81d..48a7c2e75a28 100644 --- a/pkg/sql/colflow/explain_vec.go +++ b/pkg/sql/colflow/explain_vec.go @@ -74,7 +74,7 @@ func convertToVecTree( math.MaxInt64, /* noteworthy */ flowCtx.Cfg.Settings, ) - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) defer creator.cleanup(ctx) opChains, _, err = creator.setupFlow(ctx, flowCtx, flow.Processors, localProcessors, fuseOpt) diff --git a/pkg/sql/colflow/vectorized_flow_space_test.go b/pkg/sql/colflow/vectorized_flow_space_test.go index 6521e7ff931d..da3382c5496a 100644 --- a/pkg/sql/colflow/vectorized_flow_space_test.go +++ b/pkg/sql/colflow/vectorized_flow_space_test.go @@ -79,9 +79,9 @@ func TestVectorizeInternalMemorySpaceError(t *testing.T) { } memMon := mon.NewMonitor("MemoryMonitor", mon.MemoryResource, nil, nil, 0, math.MaxInt64, st) if success { - memMon.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memMon.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) } else { - memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + memMon.Start(ctx, nil, mon.NewStandaloneBudget(1)) } defer memMon.Stop(ctx) acc := memMon.MakeBoundAccount() @@ -204,7 +204,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) { memMon := mon.NewMonitor("MemoryMonitor", mon.MemoryResource, nil, nil, 0, math.MaxInt64, st) flowCtx.Cfg.TestingKnobs = execinfra.TestingKnobs{} if expectNoMemoryError { - memMon.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memMon.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) if !success { // These are the cases that we expect in-memory operators to hit a // memory error. To enable testing this case, force disk spills. We @@ -213,7 +213,7 @@ func TestVectorizeAllocatorSpaceError(t *testing.T) { flowCtx.Cfg.TestingKnobs.ForceDiskSpill = true } } else { - memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + memMon.Start(ctx, nil, mon.NewStandaloneBudget(1)) flowCtx.Cfg.TestingKnobs.ForceDiskSpill = true } defer memMon.Stop(ctx) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ac182d69cbbd..c19f82bc8ba3 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -748,7 +748,7 @@ func (h ConnectionHandler) GetQueryCancelKey() pgwirecancel.BackendKeyData { // If not nil, reserved represents memory reserved for the connection. The // connExecutor takes ownership of this memory. func (s *Server) ServeConn( - ctx context.Context, h ConnectionHandler, reserved mon.BoundAccount, cancel context.CancelFunc, + ctx context.Context, h ConnectionHandler, reserved *mon.BoundAccount, cancel context.CancelFunc, ) error { defer func() { r := recover() @@ -989,7 +989,7 @@ func (s *Server) newConnExecutorWithTxn( // The new transaction stuff below requires active monitors and traces, so // we need to activate the executor now. - ex.activate(ctx, parentMon, mon.BoundAccount{}) + ex.activate(ctx, parentMon, &mon.BoundAccount{}) // Perform some surgery on the executor - replace its state machine and // initialize the state. @@ -1724,7 +1724,7 @@ func (ex *connExecutor) sessionData() *sessiondata.SessionData { // reserved: Memory reserved for the connection. The connExecutor takes // ownership of this memory. func (ex *connExecutor) activate( - ctx context.Context, parentMon *mon.BytesMonitor, reserved mon.BoundAccount, + ctx context.Context, parentMon *mon.BytesMonitor, reserved *mon.BoundAccount, ) { // Note: we pass `reserved` to sessionRootMon where it causes it to act as a // buffer. This is not done for sessionMon nor state.mon: these monitors don't @@ -1732,7 +1732,7 @@ func (ex *connExecutor) activate( // soon as the first allocation. This is acceptable because the session is // single threaded, and the point of buffering is just to avoid contention. ex.mon.Start(ctx, parentMon, reserved) - ex.sessionMon.Start(ctx, ex.mon, mon.BoundAccount{}) + ex.sessionMon.StartNoReserved(ctx, ex.mon) // Enable the trace if configured. if traceSessionEventLogEnabled.Get(&ex.server.cfg.Settings.SV) { @@ -1787,7 +1787,7 @@ func (ex *connExecutor) activate( func (ex *connExecutor) run( ctx context.Context, parentMon *mon.BytesMonitor, - reserved mon.BoundAccount, + reserved *mon.BoundAccount, onCancel context.CancelFunc, ) (err error) { if !ex.activated { @@ -2397,7 +2397,7 @@ func (ex *connExecutor) execCopyIn( // HACK: We're reaching inside ex.state and starting the monitor. Normally // that's driven by the state machine, but we're bypassing the state machine // here. - ex.state.mon.Start(ctx, ex.sessionMon, mon.BoundAccount{} /* reserved */) + ex.state.mon.StartNoReserved(ctx, ex.sessionMon) monToStop = ex.state.mon } txnOpt.resetPlanner = func(ctx context.Context, p *planner, txn *kv.Txn, txnTS time.Time, stmtTS time.Time) { diff --git a/pkg/sql/conn_executor_internal_test.go b/pkg/sql/conn_executor_internal_test.go index 0ec6c6ae731b..29af67f84164 100644 --- a/pkg/sql/conn_executor_internal_test.go +++ b/pkg/sql/conn_executor_internal_test.go @@ -358,7 +358,7 @@ func startConnExecutor( // routine, we're going to push commands into the StmtBuf and, from time to // time, collect and check their results. go func() { - finished <- s.ServeConn(ctx, conn, mon.BoundAccount{}, nil /* cancel */) + finished <- s.ServeConn(ctx, conn, &mon.BoundAccount{}, nil /* cancel */) }() return buf, syncResults, finished, stopper, resultChannel, nil } @@ -402,7 +402,7 @@ CREATE TEMPORARY TABLE foo(); done := make(chan error) go func() { - done <- srv.ServeConn(ctx, connHandler, mon.BoundAccount{}, nil /* cancel */) + done <- srv.ServeConn(ctx, connHandler, &mon.BoundAccount{}, nil /* cancel */) }() results := <-flushed require.Len(t, results, 6) // We expect results for 5 statements + sync. diff --git a/pkg/sql/delete_preserving_index_test.go b/pkg/sql/delete_preserving_index_test.go index 9dadb18178d1..bb7c783a48fc 100644 --- a/pkg/sql/delete_preserving_index_test.go +++ b/pkg/sql/delete_preserving_index_test.go @@ -741,7 +741,7 @@ func fetchIndex( var fetcher row.Fetcher var alloc tree.DatumAlloc - mm := mon.MakeStandaloneBudget(1 << 30) + mm := mon.NewStandaloneBudget(1 << 30) idx, err := table.FindIndexWithName(indexName) require.NoError(t, err) colIdxMap := catalog.ColumnIDToOrdinalMap(table.PublicColumns()) diff --git a/pkg/sql/distsql/server.go b/pkg/sql/distsql/server.go index 796c1746e1e4..df1a6c819f02 100644 --- a/pkg/sql/distsql/server.go +++ b/pkg/sql/distsql/server.go @@ -89,7 +89,7 @@ func NewServer( cfg.Settings, ), } - ds.memMonitor.Start(ctx, cfg.ParentMemoryMonitor, mon.BoundAccount{}) + ds.memMonitor.StartNoReserved(ctx, cfg.ParentMemoryMonitor) // We have to initialize the flow scheduler at the same time we're creating // the DistSQLServer because the latter will be registered as a gRPC service // right away, so the RPCs might start coming in pretty much right after the @@ -277,7 +277,7 @@ func (ds *ServerImpl) setupFlow( noteworthyMemoryUsageBytes, ds.Settings, ) - monitor.Start(ctx, parentMonitor, mon.BoundAccount{}) + monitor.StartNoReserved(ctx, parentMonitor) makeLeaf := func() (*kv.Txn, error) { tis := req.LeafTxnInputState diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index c86dd45565ef..c9af2c53f12e 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1269,7 +1269,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( noteworthyMemoryUsageBytes, dsp.distSQLSrv.Settings, ) - subqueryMonitor.Start(ctx, evalCtx.Mon, mon.BoundAccount{}) + subqueryMonitor.StartNoReserved(ctx, evalCtx.Mon) defer subqueryMonitor.Stop(ctx) subqueryMemAccount := subqueryMonitor.MakeBoundAccount() @@ -1622,7 +1622,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( noteworthyMemoryUsageBytes, dsp.distSQLSrv.Settings, ) - postqueryMonitor.Start(ctx, evalCtx.Mon, mon.BoundAccount{}) + postqueryMonitor.StartNoReserved(ctx, evalCtx.Mon) defer postqueryMonitor.Stop(ctx) postqueryMemAccount := postqueryMonitor.MakeBoundAccount() diff --git a/pkg/sql/execinfra/processorsbase.go b/pkg/sql/execinfra/processorsbase.go index ba33a4254dec..8ab45ff28b35 100644 --- a/pkg/sql/execinfra/processorsbase.go +++ b/pkg/sql/execinfra/processorsbase.go @@ -947,7 +947,7 @@ func NewMonitor( ctx context.Context, parent *mon.BytesMonitor, name redact.RedactableString, ) *mon.BytesMonitor { monitor := mon.NewMonitorInheritWithLimit(name, 0 /* limit */, parent) - monitor.Start(ctx, parent, mon.BoundAccount{}) + monitor.StartNoReserved(ctx, parent) return monitor } @@ -961,7 +961,7 @@ func NewLimitedMonitor( ctx context.Context, parent *mon.BytesMonitor, flowCtx *FlowCtx, name redact.RedactableString, ) *mon.BytesMonitor { limitedMon := mon.NewMonitorInheritWithLimit(name, GetWorkMemLimit(flowCtx), parent) - limitedMon.Start(ctx, parent, mon.BoundAccount{}) + limitedMon.StartNoReserved(ctx, parent) return limitedMon } diff --git a/pkg/sql/execinfra/testutils.go b/pkg/sql/execinfra/testutils.go index f2e2bec148bc..e6ca1f696754 100644 --- a/pkg/sql/execinfra/testutils.go +++ b/pkg/sql/execinfra/testutils.go @@ -95,7 +95,7 @@ func NewTestMemMonitor(ctx context.Context, st *cluster.Settings) *mon.BytesMoni math.MaxInt64, /* noteworthy */ st, ) - memMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) return memMonitor } @@ -111,7 +111,7 @@ func NewTestDiskMonitor(ctx context.Context, st *cluster.Settings) *mon.BytesMon math.MaxInt64, st, ) - diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) return diskMonitor } diff --git a/pkg/sql/indexbackfiller_test.go b/pkg/sql/indexbackfiller_test.go index e5febb3a01d2..ae803037495a 100644 --- a/pkg/sql/indexbackfiller_test.go +++ b/pkg/sql/indexbackfiller_test.go @@ -375,7 +375,7 @@ INSERT INTO foo VALUES (1), (10), (100); ) []tree.Datums { t.Helper() - mm := mon.MakeStandaloneBudget(1 << 30) + mm := mon.NewStandaloneBudget(1 << 30) idx, err := table.FindIndexWithID(indexID) colIDsNeeded := idx.CollectKeyColumnIDs() if idx.Primary() { diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 55ec54c6eb6e..721b9a30c2e5 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -108,7 +108,7 @@ func MakeInternalExecutor( math.MaxInt64, /* noteworthy */ settings, ) - monitor.Start(ctx, s.pool, mon.BoundAccount{}) + monitor.StartNoReserved(ctx, s.pool) return InternalExecutor{ s: s, mon: monitor, @@ -203,7 +203,7 @@ func (ie *InternalExecutor) initConnEx( wg.Add(1) go func() { - if err := ex.run(ctx, ie.mon, mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil { + if err := ex.run(ctx, ie.mon, &mon.BoundAccount{} /*reserved*/, nil /* cancel */); err != nil { sqltelemetry.RecordError(ctx, err, &ex.server.cfg.Settings.SV) errCallback(err) } diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index f92e125c17ab..65744505e859 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -155,7 +155,7 @@ func (s *Server) serveConn( ctx context.Context, netConn net.Conn, sArgs sql.SessionArgs, - reserved mon.BoundAccount, + reserved *mon.BoundAccount, connStart time.Time, authOpt authOptions, ) { @@ -276,7 +276,7 @@ func (c *conn) serveImpl( ctx context.Context, draining func() bool, sqlServer *sql.Server, - reserved mon.BoundAccount, + reserved *mon.BoundAccount, authOpt authOptions, ) { defer func() { _ = c.conn.Close() }() @@ -632,7 +632,7 @@ func (c *conn) processCommandsAsync( authOpt authOptions, ac AuthConn, sqlServer *sql.Server, - reserved mon.BoundAccount, + reserved *mon.BoundAccount, cancelConn func(), onDefaultIntSizeChange func(newSize int32), ) <-chan error { diff --git a/pkg/sql/pgwire/conn_test.go b/pkg/sql/pgwire/conn_test.go index c2b84ec6c960..98be5211ab5e 100644 --- a/pkg/sql/pgwire/conn_test.go +++ b/pkg/sql/pgwire/conn_test.go @@ -116,7 +116,7 @@ func TestConn(t *testing.T) { func() bool { return false }, /* draining */ // sqlServer - nil means don't create a command processor and a write side of the conn nil, - mon.BoundAccount{}, /* reserved */ + &mon.BoundAccount{}, /* reserved */ authOptions{testingSkipAuth: true, connType: hba.ConnHostAny}, ) return nil @@ -1082,7 +1082,7 @@ func TestMaliciousInputs(t *testing.T) { ctx, func() bool { return false }, /* draining */ nil, /* sqlServer */ - mon.BoundAccount{}, /* reserved */ + &mon.BoundAccount{}, /* reserved */ authOptions{testingSkipAuth: true, connType: hba.ConnHostAny}, ) if err := <-errChan; err != nil { diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index 667d96b371fd..7c88e17246e0 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -344,7 +344,7 @@ func MakeServer( nil, /* curCount */ nil, /* maxHist */ 0, noteworthySQLMemoryUsageBytes, st) - server.sqlMemoryPool.Start(context.Background(), parentMemoryMonitor, mon.BoundAccount{}) + server.sqlMemoryPool.StartNoReserved(context.Background(), parentMemoryMonitor) server.SQLServer = sql.NewServer(executorConfig, server.sqlMemoryPool) // TODO(knz,ben): Use a cluster setting for this. @@ -355,7 +355,7 @@ func MakeServer( server.metrics.ConnMemMetrics.CurBytesCount, server.metrics.ConnMemMetrics.MaxBytesHist, int64(connReservationBatchSize)*baseSQLMemoryBudget, noteworthyConnMemoryUsageBytes, st) - server.connMonitor.Start(context.Background(), server.sqlMemoryPool, mon.BoundAccount{}) + server.connMonitor.StartNoReserved(context.Background(), server.sqlMemoryPool) server.mu.Lock() server.mu.connCancelMap = make(cancelChanMap) @@ -872,7 +872,7 @@ func (s *Server) ServeConn(ctx context.Context, conn net.Conn, socketType Socket // This includes authentication. s.serveConn( ctx, conn, sArgs, - reserved, + &reserved, connStart, authOptions{ connType: connType, diff --git a/pkg/sql/planner.go b/pkg/sql/planner.go index 55299264ce69..0e770dc3a4d9 100644 --- a/pkg/sql/planner.go +++ b/pkg/sql/planner.go @@ -360,7 +360,7 @@ func newInternalPlanner( memMetrics.CurBytesCount, memMetrics.MaxBytesHist, -1, /* increment */ noteworthyInternalMemoryUsageBytes, execCfg.Settings) - plannerMon.Start(ctx, execCfg.RootMemoryMonitor, mon.BoundAccount{}) + plannerMon.StartNoReserved(ctx, execCfg.RootMemoryMonitor) smi := &sessionDataMutatorIterator{ sds: sds, diff --git a/pkg/sql/row/fetcher.go b/pkg/sql/row/fetcher.go index 7f1e4ae3e3dd..8ba7dd4b4c74 100644 --- a/pkg/sql/row/fetcher.go +++ b/pkg/sql/row/fetcher.go @@ -262,7 +262,7 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error { if args.MemMonitor != nil { rf.mon = mon.NewMonitorInheritWithLimit("fetcher-mem", 0 /* limit */, args.MemMonitor) - rf.mon.Start(ctx, args.MemMonitor, mon.BoundAccount{}) + rf.mon.StartNoReserved(ctx, args.MemMonitor) memAcc := rf.mon.MakeBoundAccount() rf.kvFetcherMemAcc = &memAcc } diff --git a/pkg/sql/row/fetcher_test.go b/pkg/sql/row/fetcher_test.go index 2de241c2982b..db75ac7062e1 100644 --- a/pkg/sql/row/fetcher_test.go +++ b/pkg/sql/row/fetcher_test.go @@ -342,7 +342,7 @@ func TestRowFetcherMemoryLimits(t *testing.T) { // we can test whether scans of wide tables are prevented if // we have insufficient memory to do them. memMon := mon.NewMonitor("test", mon.MemoryResource, nil, nil, -1, 1000, settings) - memMon.Start(ctx, nil, mon.MakeStandaloneBudget(1<<20)) + memMon.Start(ctx, nil, mon.NewStandaloneBudget(1<<20)) defer memMon.Stop(ctx) txn := kv.NewTxn(ctx, kvDB, 0) rf := initFetcher(t, txn, args, false /*reverseScan*/, alloc, memMon) diff --git a/pkg/sql/rowcontainer/disk_row_container_test.go b/pkg/sql/rowcontainer/disk_row_container_test.go index 97469725c3eb..500bfa760c93 100644 --- a/pkg/sql/rowcontainer/disk_row_container_test.go +++ b/pkg/sql/rowcontainer/disk_row_container_test.go @@ -117,7 +117,7 @@ func TestDiskRowContainer(t *testing.T) { math.MaxInt64, st, ) - diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) t.Run("EncodeDecode", func(t *testing.T) { for i := 0; i < 100; i++ { @@ -387,7 +387,7 @@ func TestDiskRowContainerDiskFull(t *testing.T) { math.MaxInt64, st, ) - monitor.Start(ctx, nil, mon.MakeStandaloneBudget(0 /* capacity */)) + monitor.Start(ctx, nil, mon.NewStandaloneBudget(0 /* capacity */)) d := MakeDiskRowContainer( monitor, @@ -426,7 +426,7 @@ func TestDiskRowContainerFinalIterator(t *testing.T) { math.MaxInt64, st, ) - diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) d := MakeDiskRowContainer(diskMonitor, types.OneIntCol, nil /* ordering */, tempEngine) @@ -554,7 +554,7 @@ func TestDiskRowContainerUnsafeReset(t *testing.T) { math.MaxInt64, st, ) - monitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + monitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) d := MakeDiskRowContainer(monitor, types.OneIntCol, nil /* ordering */, tempEngine) defer d.Close(ctx) diff --git a/pkg/sql/rowcontainer/hash_row_container_test.go b/pkg/sql/rowcontainer/hash_row_container_test.go index c5f706a88481..b2c1c507bf6e 100644 --- a/pkg/sql/rowcontainer/hash_row_container_test.go +++ b/pkg/sql/rowcontainer/hash_row_container_test.go @@ -89,9 +89,9 @@ func TestHashDiskBackedRowContainer(t *testing.T) { // disk halfway through, keeps on adding rows, and then verifies that all // rows were properly added to the hashDiskBackedRowContainer. t.Run("NormalRun", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) rc := getRowContainer() defer rc.Close(ctx) @@ -134,9 +134,9 @@ func TestHashDiskBackedRowContainer(t *testing.T) { }) t.Run("AddRowOutOfMem", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(1)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) rc := getRowContainer() defer rc.Close(ctx) @@ -156,9 +156,9 @@ func TestHashDiskBackedRowContainer(t *testing.T) { }) t.Run("AddRowOutOfDisk", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(1)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(1)) rc := getRowContainer() defer rc.Close(ctx) @@ -184,9 +184,9 @@ func TestHashDiskBackedRowContainer(t *testing.T) { // container to disk, and verifies that the iterator was recreated and points // to the appropriate row. t.Run("VerifyIteratorRecreation", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) rc := getRowContainer() defer rc.Close(ctx) @@ -259,9 +259,9 @@ func TestHashDiskBackedRowContainer(t *testing.T) { // spills the container to disk, and verifies that the iterator was recreated // and is not valid. t.Run("VerifyIteratorRecreationAfterExhaustion", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) rc := getRowContainer() defer rc.Close(ctx) @@ -376,9 +376,9 @@ func TestHashDiskBackedRowContainerPreservesMatchesAndMarks(t *testing.T) { // from the same buckets, and then verifies that all rows were properly added // to the hashDiskBackedRowContainer. t.Run("PreservingMatches", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) rc := getRowContainer() defer rc.Close(ctx) @@ -425,9 +425,9 @@ func TestHashDiskBackedRowContainerPreservesMatchesAndMarks(t *testing.T) { // hashDiskBackedRowContainer, marks all rows belonging to the first bucket, // spills to disk, and checks that marks are preserved correctly. t.Run("PreservingMarks", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) rc := getRowContainer() defer rc.Close(ctx) diff --git a/pkg/sql/rowcontainer/numbered_row_container_test.go b/pkg/sql/rowcontainer/numbered_row_container_test.go index a8b747ecc9fc..758fe3c162a0 100644 --- a/pkg/sql/rowcontainer/numbered_row_container_test.go +++ b/pkg/sql/rowcontainer/numbered_row_container_test.go @@ -44,7 +44,7 @@ func newTestDiskMonitor(ctx context.Context, st *cluster.Settings) *mon.BytesMon math.MaxInt64, /* noteworthy */ st, ) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) return diskMonitor } @@ -83,7 +83,7 @@ func TestNumberedRowContainerDeDuping(t *testing.T) { fmt.Printf("using smallMemoryBudget to spill to disk\n") memoryBudget = smallMemoryBudget } - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(int64(memoryBudget))) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(int64(memoryBudget))) defer memoryMonitor.Stop(ctx) // Use random types and random rows. @@ -169,7 +169,7 @@ func TestNumberedRowContainerIteratorCaching(t *testing.T) { // This memory budget allows for some caching, but typically cannot // cache all the rows. const memoryBudget = 12000 - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(memoryBudget)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(memoryBudget)) defer memoryMonitor.Stop(ctx) // Use random types and random rows. @@ -463,7 +463,7 @@ func makeMemMonitorAndStart( math.MaxInt64, /* noteworthy */ st, ) - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(budget)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(budget)) return memoryMonitor } diff --git a/pkg/sql/rowcontainer/row_container_test.go b/pkg/sql/rowcontainer/row_container_test.go index 275777739f66..e998d5a57f2a 100644 --- a/pkg/sql/rowcontainer/row_container_test.go +++ b/pkg/sql/rowcontainer/row_container_test.go @@ -237,9 +237,9 @@ func TestDiskBackedRowContainer(t *testing.T) { // halfway through, keeps on adding rows, and then verifies that all rows // were properly added to the DiskBackedRowContainer. t.Run("NormalRun", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) defer func() { @@ -285,9 +285,9 @@ func TestDiskBackedRowContainer(t *testing.T) { }) t.Run("AddRowOutOfMem", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(1)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) defer func() { @@ -311,9 +311,9 @@ func TestDiskBackedRowContainer(t *testing.T) { }) t.Run("AddRowOutOfDisk", func(t *testing.T) { - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(1)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(1)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(1)) defer diskMonitor.Stop(ctx) defer func() { @@ -363,7 +363,7 @@ func TestDiskBackedRowContainerDeDuping(t *testing.T) { ) diskMonitor := newTestDiskMonitor(ctx, st) - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) defer diskMonitor.Stop(ctx) @@ -497,9 +497,9 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { newOrdering := colinfo.ColumnOrdering{{ColIdx: 1, Direction: encoding.Ascending}} rng := rand.New(rand.NewSource(timeutil.Now().UnixNano())) - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) // SpillingHalfway adds half of all rows into DiskBackedIndexedRowContainer, @@ -662,9 +662,9 @@ func TestDiskBackedIndexedRowContainer(t *testing.T) { sortedRows.rows = append(sortedRows.rows, IndexedRow{Idx: len(sortedRows.rows), Row: row}) } - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(budget)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(budget)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) sorter := rowsSorter{evalCtx: &evalCtx, rows: sortedRows, ordering: ordering} @@ -933,9 +933,9 @@ func BenchmarkDiskBackedIndexedRowContainer(b *testing.B) { st, ) rows := randgen.MakeIntRows(numRows, numCols) - memoryMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + memoryMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer memoryMonitor.Stop(ctx) - diskMonitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) accessPattern := generateAccessPattern(numRows) diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index bd4f152fc2c1..b7f21e06c2e8 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -423,7 +423,7 @@ func newJoinReader( jr.MemMonitor = mon.NewMonitorInheritWithLimit( "joinreader-mem" /* name */, memoryLimit, flowCtx.EvalCtx.Mon, ) - jr.MemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) + jr.MemMonitor.StartNoReserved(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon) jr.memAcc = jr.MemMonitor.MakeBoundAccount() if err := jr.initJoinReaderStrategy(flowCtx, rightTypes, readerType); err != nil { @@ -471,7 +471,7 @@ func newJoinReader( jr.streamerInfo.unlimitedMemMonitor = mon.NewMonitorInheritWithLimit( "joinreader-streamer-unlimited" /* name */, math.MaxInt64, flowCtx.EvalCtx.Mon, ) - jr.streamerInfo.unlimitedMemMonitor.Start(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon, mon.BoundAccount{}) + jr.streamerInfo.unlimitedMemMonitor.StartNoReserved(flowCtx.EvalCtx.Ctx(), flowCtx.EvalCtx.Mon) jr.streamerInfo.budgetAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() jr.streamerInfo.txnKVStreamerMemAcc = jr.streamerInfo.unlimitedMemMonitor.MakeBoundAccount() diff --git a/pkg/sql/rowexec/joinreader_test.go b/pkg/sql/rowexec/joinreader_test.go index 40a7d4b3656a..325fa7ff6593 100644 --- a/pkg/sql/rowexec/joinreader_test.go +++ b/pkg/sql/rowexec/joinreader_test.go @@ -1068,7 +1068,7 @@ func TestJoinReader(t *testing.T) { math.MaxInt64, st, ) - diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) for i, td := range []catalog.TableDescriptor{tdSecondary, tdFamily} { for _, c := range testCases { @@ -1271,7 +1271,7 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil { math.MaxInt64, st, ) - diskMonitor.Start(ctx, nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64)) + diskMonitor.Start(ctx, nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) defer diskMonitor.Stop(ctx) flowCtx := execinfra.FlowCtx{ EvalCtx: &evalCtx, diff --git a/pkg/sql/rowexec/windower.go b/pkg/sql/rowexec/windower.go index bc88d20b23bd..773e4029166b 100644 --- a/pkg/sql/rowexec/windower.go +++ b/pkg/sql/rowexec/windower.go @@ -157,7 +157,7 @@ func newWindower( limit = memRequiredByWindower } limitedMon := mon.NewMonitorInheritWithLimit("windower-limited", limit, evalCtx.Mon) - limitedMon.Start(ctx, evalCtx.Mon, mon.BoundAccount{}) + limitedMon.StartNoReserved(ctx, evalCtx.Mon) if err := w.InitWithEvalCtx( w, diff --git a/pkg/sql/sem/builtins/show_create_all_tables_builtin_test.go b/pkg/sql/sem/builtins/show_create_all_tables_builtin_test.go index 8a7e00726f35..34258d792cef 100644 --- a/pkg/sql/sem/builtins/show_create_all_tables_builtin_test.go +++ b/pkg/sql/sem/builtins/show_create_all_tables_builtin_test.go @@ -35,7 +35,7 @@ func TestTopologicalSort(t *testing.T) { math.MaxInt64, /* noteworthy */ cluster.MakeTestingClusterSettings(), ) - monitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + monitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) acc := monitor.MakeBoundAccount() testCases := []struct { diff --git a/pkg/sql/sem/eval/context.go b/pkg/sql/sem/eval/context.go index a1f9e8f949ef..6e046b25c081 100644 --- a/pkg/sql/sem/eval/context.go +++ b/pkg/sql/sem/eval/context.go @@ -279,7 +279,7 @@ func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonit Settings: st, NodeID: base.TestingIDContainer, } - monitor.Start(context.Background(), nil /* pool */, mon.MakeStandaloneBudget(math.MaxInt64)) + monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64)) ctx.Mon = monitor ctx.Context = context.TODO() now := timeutil.Now() diff --git a/pkg/sql/sqlstats/sslocal/sql_stats.go b/pkg/sql/sqlstats/sslocal/sql_stats.go index f45b5f23e81f..2d518b1a3866 100644 --- a/pkg/sql/sqlstats/sslocal/sql_stats.go +++ b/pkg/sql/sqlstats/sslocal/sql_stats.go @@ -99,7 +99,7 @@ func newSQLStats( } s.mu.apps = make(map[string]*ssmemstorage.Container) s.mu.mon = monitor - s.mu.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) + s.mu.mon.StartNoReserved(context.Background(), parentMon) return s } diff --git a/pkg/sql/stats/row_sampling_test.go b/pkg/sql/stats/row_sampling_test.go index a636a91bbd48..d80baed1e5a3 100644 --- a/pkg/sql/stats/row_sampling_test.go +++ b/pkg/sql/stats/row_sampling_test.go @@ -118,7 +118,7 @@ func TestSampleReservoir(t *testing.T) { math.MaxInt64, st, ) - monitor.Start(ctx, nil, mon.MakeStandaloneBudget(math.MaxInt64)) + monitor.Start(ctx, nil, mon.NewStandaloneBudget(math.MaxInt64)) memAcc := monitor.MakeBoundAccount() expectedK := k if mem == 1<<8 && n > 1 && k > 1 { diff --git a/pkg/sql/txn_fingerprint_id_cache.go b/pkg/sql/txn_fingerprint_id_cache.go index 5f6c154743c7..7f793e4e46db 100644 --- a/pkg/sql/txn_fingerprint_id_cache.go +++ b/pkg/sql/txn_fingerprint_id_cache.go @@ -69,7 +69,7 @@ func NewTxnFingerprintIDCache( monitor := mon.NewMonitorInheritWithLimit("txn-fingerprint-id-cache", 0 /* limit */, parentMon) b.mon = monitor - b.mon.Start(context.Background(), parentMon, mon.BoundAccount{}) + b.mon.StartNoReserved(context.Background(), parentMon) return b } diff --git a/pkg/sql/txn_state.go b/pkg/sql/txn_state.go index 85e7a825fe6c..edc048da31fe 100644 --- a/pkg/sql/txn_state.go +++ b/pkg/sql/txn_state.go @@ -208,7 +208,7 @@ func (ts *txnState) resetForNewSQLTxn( } ts.Ctx, ts.cancel = contextutil.WithCancel(txnCtx) - ts.mon.Start(ts.Ctx, tranCtx.connMon, mon.BoundAccount{} /* reserved */) + ts.mon.StartNoReserved(ts.Ctx, tranCtx.connMon) txnID = func() (txnID uuid.UUID) { ts.mu.Lock() defer ts.mu.Unlock() diff --git a/pkg/sql/txn_state_test.go b/pkg/sql/txn_state_test.go index accddc06cf4a..d8171a7a5e85 100644 --- a/pkg/sql/txn_state_test.go +++ b/pkg/sql/txn_state_test.go @@ -92,7 +92,7 @@ func (tc *testContext) createOpenState(typ txnType) (fsm.State, *txnState) { 1000, /* noteworthy */ cluster.MakeTestingClusterSettings(), ) - txnStateMon.Start(tc.ctx, tc.mon, mon.BoundAccount{}) + txnStateMon.StartNoReserved(tc.ctx, tc.mon) ts := txnState{ Ctx: ctx, diff --git a/pkg/storage/pebble_mvcc_scanner_test.go b/pkg/storage/pebble_mvcc_scanner_test.go index 93ca0bf53890..884119116657 100644 --- a/pkg/storage/pebble_mvcc_scanner_test.go +++ b/pkg/storage/pebble_mvcc_scanner_test.go @@ -174,7 +174,7 @@ func scannerWithAccount( ctx context.Context, st *cluster.Settings, scanner *pebbleMVCCScanner, limitBytes int64, ) (cleanup func()) { m := mon.NewMonitor("test", mon.MemoryResource, nil, nil, 1, math.MaxInt64, st) - m.Start(ctx, nil, mon.MakeStandaloneBudget(limitBytes)) + m.Start(ctx, nil, mon.NewStandaloneBudget(limitBytes)) ba := m.MakeBoundAccount() scanner.memAccount = &ba return func() { diff --git a/pkg/ts/query_test.go b/pkg/ts/query_test.go index 2b80c9e875ec..f6a8f3ab82ff 100644 --- a/pkg/ts/query_test.go +++ b/pkg/ts/query_test.go @@ -403,7 +403,7 @@ func TestQueryWorkerMemoryConstraint(t *testing.T) { math.MaxInt64, cluster.MakeTestingClusterSettings(), ) - adjustedMon.Start(context.Background(), tm.workerMemMonitor, mon.BoundAccount{}) + adjustedMon.StartNoReserved(context.Background(), tm.workerMemMonitor) defer adjustedMon.Stop(context.Background()) query := tm.makeQuery("test.metric", resolution1ns, 11, 109) @@ -419,7 +419,7 @@ func TestQueryWorkerMemoryConstraint(t *testing.T) { } { // Limit memory in use by model. Reset memory monitor to get new maximum. adjustedMon.Stop(context.Background()) - adjustedMon.Start(context.Background(), tm.workerMemMonitor, mon.BoundAccount{}) + adjustedMon.StartNoReserved(context.Background(), tm.workerMemMonitor) if adjustedMon.MaximumBytes() != 0 { t.Fatalf("maximum bytes was %d, wanted zero", adjustedMon.MaximumBytes()) } @@ -481,7 +481,7 @@ func TestQueryWorkerMemoryMonitor(t *testing.T) { 100, cluster.MakeTestingClusterSettings(), ) - limitedMon.Start(context.Background(), tm.workerMemMonitor, mon.BoundAccount{}) + limitedMon.StartNoReserved(context.Background(), tm.workerMemMonitor) defer limitedMon.Stop(context.Background()) // Assert correctness with no memory pressure. @@ -503,7 +503,7 @@ func TestQueryWorkerMemoryMonitor(t *testing.T) { // Start/Stop limited monitor to reset maximum allocation. limitedMon.Stop(context.Background()) - limitedMon.Start(context.Background(), tm.workerMemMonitor, mon.BoundAccount{}) + limitedMon.StartNoReserved(context.Background(), tm.workerMemMonitor) var ( memStatsBefore runtime.MemStats diff --git a/pkg/ts/rollup_test.go b/pkg/ts/rollup_test.go index 79fe724ceed2..0468bfce3b88 100644 --- a/pkg/ts/rollup_test.go +++ b/pkg/ts/rollup_test.go @@ -273,7 +273,7 @@ func TestRollupMemoryConstraint(t *testing.T) { math.MaxInt64, cluster.MakeTestingClusterSettings(), ) - adjustedMon.Start(context.Background(), tm.workerMemMonitor, mon.BoundAccount{}) + adjustedMon.StartNoReserved(context.Background(), tm.workerMemMonitor) defer adjustedMon.Stop(context.Background()) // Roll up time series with the new monitor to measure high-water mark @@ -319,7 +319,7 @@ func TestRollupMemoryConstraint(t *testing.T) { // Restart monitor to clear query memory options. adjustedMon.Stop(context.Background()) - adjustedMon.Start(context.Background(), tm.workerMemMonitor, mon.BoundAccount{}) + adjustedMon.StartNoReserved(context.Background(), tm.workerMemMonitor) qmc := MakeQueryMemoryContext(adjustedMon, adjustedMon, QueryMemoryOptions{ // Large budget, but not maximum to avoid overflows. diff --git a/pkg/ts/server.go b/pkg/ts/server.go index a4c79f592552..fa54007bb455 100644 --- a/pkg/ts/server.go +++ b/pkg/ts/server.go @@ -141,12 +141,12 @@ func MakeServer( workerSem: workerSem, } - s.workerMemMonitor.Start(ctx, memoryMonitor, mon.BoundAccount{}) + s.workerMemMonitor.StartNoReserved(ctx, memoryMonitor) stopper.AddCloser(stop.CloserFn(func() { s.workerMemMonitor.Stop(ctx) })) - s.resultMemMonitor.Start(ambient.AnnotateCtx(context.Background()), memoryMonitor, mon.BoundAccount{}) + s.resultMemMonitor.StartNoReserved(ambient.AnnotateCtx(context.Background()), memoryMonitor) stopper.AddCloser(stop.CloserFn(func() { s.resultMemMonitor.Stop(ctx) })) diff --git a/pkg/util/mon/bytes_usage.go b/pkg/util/mon/bytes_usage.go index cdcf71831d09..dd1263324bc4 100644 --- a/pkg/util/mon/bytes_usage.go +++ b/pkg/util/mon/bytes_usage.go @@ -212,7 +212,7 @@ type BytesMonitor struct { // pool, reserved determines the maximum allocation capacity of this // monitor. The reserved bytes are released to their owner monitor // upon Stop. - reserved BoundAccount + reserved *BoundAccount // limit specifies a hard limit on the number of bytes a monitor allows to // be allocated. Note that this limit will not be observed if allocations @@ -339,6 +339,11 @@ func NewMonitorInheritWithLimit( ) } +// StartNoReserved is the same as Start when there is no pre-reserved budget. +func (mm *BytesMonitor) StartNoReserved(ctx context.Context, pool *BytesMonitor) { + mm.Start(ctx, pool, &BoundAccount{}) +} + // Start begins a monitoring region. // Arguments: // - pool is the upstream monitor that provision allocations exceeding the @@ -346,7 +351,7 @@ func NewMonitorInheritWithLimit( // and the pre-reserved budget determines the entire capacity of this monitor. // // - reserved is the pre-reserved budget (see above). -func (mm *BytesMonitor) Start(ctx context.Context, pool *BytesMonitor, reserved BoundAccount) { +func (mm *BytesMonitor) Start(ctx context.Context, pool *BytesMonitor, reserved *BoundAccount) { if mm.mu.curAllocated != 0 { panic(fmt.Sprintf("%s: started with %d bytes left over", mm.name, mm.mu.curAllocated)) } @@ -390,7 +395,7 @@ func NewUnlimitedMonitor( limit: math.MaxInt64, noteworthyUsageBytes: noteworthy, poolAllocationSize: DefaultPoolAllocationSize, - reserved: MakeStandaloneBudget(math.MaxInt64), + reserved: NewStandaloneBudget(math.MaxInt64), settings: settings, } m.mu.curBytesCount = curCount @@ -509,10 +514,9 @@ type BoundAccount struct { Mu *syncutil.Mutex } -// MakeStandaloneBudget creates a BoundAccount suitable for root -// monitors. -func MakeStandaloneBudget(capacity int64) BoundAccount { - return BoundAccount{used: capacity} +// NewStandaloneBudget creates a BoundAccount suitable for root monitors. +func NewStandaloneBudget(capacity int64) *BoundAccount { + return &BoundAccount{used: capacity} } // Used returns the number of bytes currently allocated through this account. @@ -605,7 +609,7 @@ func (b *BoundAccount) Clear(ctx context.Context) { return } if b.mon == nil { - // An account created by MakeStandaloneBudget is disconnected from any + // An account created by NewStandaloneBudget is disconnected from any // monitor -- "bytes out of the aether". This needs not be closed. return } @@ -620,7 +624,7 @@ func (b *BoundAccount) Close(ctx context.Context) { return } if b.mon == nil { - // An account created by MakeStandaloneBudget is disconnected from any + // An account created by NewStandaloneBudget is disconnected from any // monitor -- "bytes out of the aether". This needs not be closed. return } diff --git a/pkg/util/mon/bytes_usage_test.go b/pkg/util/mon/bytes_usage_test.go index 19d6fa5b408b..260a525dd38b 100644 --- a/pkg/util/mon/bytes_usage_test.go +++ b/pkg/util/mon/bytes_usage_test.go @@ -48,6 +48,7 @@ func TestMemoryAllocations(t *testing.T) { var paramHeader func() m := NewMonitor("test", MemoryResource, nil, nil, 0, 1000, st) + m.StartNoReserved(ctx, nil /* pool */) accs := make([]BoundAccount, 4) for i := range accs { accs[i] = m.MakeBoundAccount() @@ -142,7 +143,7 @@ func TestMemoryAllocations(t *testing.T) { for _, max := range maxs { pool = NewMonitor("test", MemoryResource, nil, nil, 1, 1000, st) - pool.Start(ctx, nil, MakeStandaloneBudget(max)) + pool.Start(ctx, nil, NewStandaloneBudget(max)) for _, hf := range hysteresisFactors { maxAllocatedButUnusedBlocks = hf @@ -156,7 +157,7 @@ func TestMemoryAllocations(t *testing.T) { // We start with a fresh monitor for every set of // parameters. m = NewMonitor("test", MemoryResource, nil, nil, pa, 1000, st) - m.Start(ctx, pool, MakeStandaloneBudget(pb)) + m.Start(ctx, pool, NewStandaloneBudget(pb)) for i := 0; i < numAccountOps; i++ { if i%linesBetweenHeaderReminders == 0 { @@ -221,7 +222,7 @@ func TestBoundAccount(t *testing.T) { ctx := context.Background() st := cluster.MakeTestingClusterSettings() m := NewMonitor("test", MemoryResource, nil, nil, 1, 1000, st) - m.Start(ctx, nil, MakeStandaloneBudget(100)) + m.Start(ctx, nil, NewStandaloneBudget(100)) m.poolAllocationSize = 1 maxAllocatedButUnusedBlocks = 1 @@ -298,7 +299,7 @@ func TestBytesMonitor(t *testing.T) { ctx := context.Background() st := cluster.MakeTestingClusterSettings() m := NewMonitor("test", MemoryResource, nil, nil, 1, 1000, st) - m.Start(ctx, nil, MakeStandaloneBudget(100)) + m.Start(ctx, nil, NewStandaloneBudget(100)) maxAllocatedButUnusedBlocks = 1 if err := m.reserveBytes(ctx, 10); err != nil { @@ -332,7 +333,7 @@ func TestBytesMonitor(t *testing.T) { limitedMonitor := NewMonitorWithLimit( "testlimit", MemoryResource, 10, nil, nil, 1, 1000, cluster.MakeTestingClusterSettings()) - limitedMonitor.Start(ctx, m, BoundAccount{}) + limitedMonitor.StartNoReserved(ctx, m) if err := limitedMonitor.reserveBytes(ctx, 10); err != nil { t.Fatalf("limited monitor refused small allocation: %v", err) @@ -353,7 +354,7 @@ func TestMemoryAllocationEdgeCases(t *testing.T) { st := cluster.MakeTestingClusterSettings() m := NewMonitor("test", MemoryResource, nil /* curCount */, nil /* maxHist */, 1e9 /* increment */, 1e9 /* noteworthy */, st) - m.Start(ctx, nil, MakeStandaloneBudget(1e9)) + m.Start(ctx, nil, NewStandaloneBudget(1e9)) a := m.MakeBoundAccount() if err := a.Grow(ctx, 1); err != nil { @@ -374,10 +375,10 @@ func TestMultiSharedGauge(t *testing.T) { parent := NewMonitor("root", MemoryResource, resourceGauge, nil, minAllocation, 0, cluster.MakeTestingClusterSettings()) - parent.Start(ctx, nil, MakeStandaloneBudget(100000)) + parent.Start(ctx, nil, NewStandaloneBudget(100000)) child := NewMonitorInheritWithLimit("child", 20000, parent) - child.Start(ctx, parent, BoundAccount{}) + child.StartNoReserved(ctx, parent) acc := child.MakeBoundAccount() require.NoError(t, acc.Grow(ctx, 100)) @@ -385,12 +386,46 @@ func TestMultiSharedGauge(t *testing.T) { require.Equal(t, minAllocation, resourceGauge.Value(), "Metric") } +func TestReservedAccountCleared(t *testing.T) { + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + + root := NewMonitor( + "root" /* name */, MemoryResource, nil /* curCount */, nil, /* maxHist */ + 1 /* increment */, 1000 /* noteworthy */, st, + ) + root.Start(ctx, nil /* pool */, NewStandaloneBudget(math.MaxInt64)) + root.RelinquishAllOnReleaseBytes() + + // Pre-reserve a budget of 100 bytes. + reserved := root.MakeBoundAccount() + require.NoError(t, reserved.Grow(ctx, 100)) + + m := NewMonitor( + "test" /* name */, MemoryResource, nil /* curCount */, nil, /* maxHist */ + 1 /* increment */, 1000 /* noteworthy */, st, + ) + m.Start(ctx, nil /* pool */, &reserved) + acc := m.MakeBoundAccount() + + // Grow the account by 50 bytes, then close the account and stop the + // monitor. + require.NoError(t, acc.Grow(ctx, 50)) + acc.Close(ctx) + m.Stop(ctx) + + // Stopping the monitor should have clear the reserved account and returned + // all pre-reserved memory back to the root monitor. + require.Equal(t, int64(0), reserved.used) + require.Equal(t, int64(0), root.mu.curBudget.used) +} + func BenchmarkBoundAccountGrow(b *testing.B) { ctx := context.Background() m := NewMonitor("test", MemoryResource, nil /* curCount */, nil /* maxHist */, 1e9 /* increment */, 1e9, /* noteworthy */ cluster.MakeTestingClusterSettings()) - m.Start(ctx, nil, MakeStandaloneBudget(1e9)) + m.Start(ctx, nil, NewStandaloneBudget(1e9)) a := m.MakeBoundAccount() for i := 0; i < b.N; i++ { From f1a8710a2fe8786b19a7f9c26a4f9a1e5e898c55 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Wed, 29 Jun 2022 17:21:58 -0700 Subject: [PATCH 4/4] sql: make sure to close the reserved account for each session This commit makes it more clear that the "reserved" memory account created for each connection is closed when that connection is closed. Previously, the account was already cleared (when "session root" monitor is stopped which happens when the connExecutor stops), so there was no leak in the accounting system because of it, yet it wasn't obvious - this commit makes it more obvious. Release note: None --- pkg/sql/conn_executor.go | 12 +++++++++--- pkg/sql/pgwire/conn.go | 6 +++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index c19f82bc8ba3..9582595ca9cc 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -746,14 +746,20 @@ func (h ConnectionHandler) GetQueryCancelKey() pgwirecancel.BackendKeyData { // embedded in the ConnHandler. // // If not nil, reserved represents memory reserved for the connection. The -// connExecutor takes ownership of this memory. +// connExecutor takes ownership of this memory and will close the account before +// exiting. func (s *Server) ServeConn( ctx context.Context, h ConnectionHandler, reserved *mon.BoundAccount, cancel context.CancelFunc, ) error { - defer func() { + // Make sure to close the reserved account even if closeWrapper below + // panics: so we do it in a defer that is guaranteed to execute. We also + // cannot close it before closeWrapper since we need to close the internal + // monitors of the connExecutor first. + defer reserved.Close(ctx) + defer func(ctx context.Context, h ConnectionHandler) { r := recover() h.ex.closeWrapper(ctx, r) - }() + }(ctx, h) return h.ex.run(ctx, s.pool, reserved, cancel) } diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 65744505e859..3b9f7e551349 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -368,6 +368,7 @@ func (c *conn) serveImpl( } else { // sqlServer == nil means we are in a local test. In this case // we only need the minimum to make pgx happy. + defer reserved.Close(ctx) var err error for param, value := range testingStatusReportParams { err = c.sendParamStatus(param, value) @@ -376,7 +377,6 @@ func (c *conn) serveImpl( } } if err != nil { - reserved.Close(ctx) return } var ac AuthConn = authPipe @@ -387,7 +387,6 @@ func (c *conn) serveImpl( procCh = dummyCh if err := c.sendReadyForQuery(0 /* queryCancelKey */); err != nil { - reserved.Close(ctx) return } } @@ -621,7 +620,8 @@ func (c *conn) serveImpl( // Args: // ac: An interface used by the authentication process to receive password data // and to ultimately declare the authentication successful. -// reserved: Reserved memory. This method takes ownership. +// reserved: Reserved memory. This method takes ownership and guarantees that it +// will be closed when this function returns. // cancelConn: A function to be called when this goroutine exits. Its goal is to // cancel the connection's context, thus stopping the connection's goroutine. // The returned channel is also closed before this goroutine dies, but the