diff --git a/pkg/kv/kvserver/concurrency/concurrency_control.go b/pkg/kv/kvserver/concurrency/concurrency_control.go index c483a07246ae..6c0488519fb4 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_control.go +++ b/pkg/kv/kvserver/concurrency/concurrency_control.go @@ -206,7 +206,7 @@ type RequestSequencer interface { // the request had against conflicting requests and allowing conflicting // requests that are blocked on this one to proceed. The guard should not // be used after being released. - FinishReq(*Guard) + FinishReq(context.Context, *Guard) } // ContentionHandler is concerned with handling contention-related errors. This @@ -521,7 +521,7 @@ type latchManager interface { Poison(latchGuard) // Release a guard's latches, relinquish its protection from conflicting requests. - Release(latchGuard) + Release(ctx context.Context, lg latchGuard) // Metrics returns information about the state of the latchManager. Metrics() LatchMetrics diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager.go b/pkg/kv/kvserver/concurrency/concurrency_manager.go index e7e081c42093..a1df16d313e0 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager.go @@ -242,7 +242,7 @@ func (m *managerImpl) SequenceReq( resp, err := m.sequenceReqWithGuard(ctx, g, branch) if resp != nil || err != nil { // Ensure that we release the guard if we return a response or an error. - m.FinishReq(g) + m.FinishReq(ctx, g) return nil, resp, err } return g, nil, nil @@ -345,7 +345,7 @@ func (m *managerImpl) sequenceReqWithGuard( // true if ScanOptimistic was called above. Therefore it will also never // be true if latchManager.AcquireOptimistic was called. if g.ltg.ShouldWait() { - m.lm.Release(g.moveLatchGuard()) + m.lm.Release(ctx, g.moveLatchGuard()) log.Event(ctx, "waiting in lock wait-queues") if err := m.ltw.WaitOn(ctx, g.Req, g.ltg); err != nil { @@ -429,7 +429,7 @@ func (m *managerImpl) PoisonReq(g *Guard) { } // FinishReq implements the RequestSequencer interface. -func (m *managerImpl) FinishReq(g *Guard) { +func (m *managerImpl) FinishReq(ctx context.Context, g *Guard) { // NOTE: we release latches _before_ exiting lock wait-queues deliberately. // Either order would be correct, but the order here avoids non-determinism in // cases where a request A holds both latches and has claimed some keys by @@ -448,7 +448,7 @@ func (m *managerImpl) FinishReq(g *Guard) { // signaler wakes up (if anyone) will never bump into its mutex immediately // upon resumption. if lg := g.moveLatchGuard(); lg != nil { - m.lm.Release(lg) + m.lm.Release(ctx, lg) } if ltg := g.moveLockTableGuard(); ltg != nil { m.lt.Dequeue(ltg) @@ -504,13 +504,13 @@ func (m *managerImpl) HandleLockConflictError( // not releasing lockWaitQueueGuards. We expect the caller of this method to // then re-sequence the Request by calling SequenceReq with the un-latched // Guard. This is analogous to iterating through the loop in SequenceReq. - m.lm.Release(g.moveLatchGuard()) + m.lm.Release(ctx, g.moveLatchGuard()) // If the discovery process collected a set of intents to resolve before the // next evaluation attempt, do so. if toResolve := g.ltg.ResolveBeforeScanning(); len(toResolve) > 0 { if err := m.ltw.ResolveDeferredIntents(ctx, g.Req.AdmissionHeader, toResolve); err != nil { - m.FinishReq(g) + m.FinishReq(ctx, g) return nil, err } } @@ -529,7 +529,7 @@ func (m *managerImpl) HandleTransactionPushError( // caller of this method to then re-sequence the Request by calling // SequenceReq with the un-latched Guard. This is analogous to iterating // through the loop in SequenceReq. - m.lm.Release(g.moveLatchGuard()) + m.lm.Release(ctx, g.moveLatchGuard()) return g } diff --git a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go index 9476e45e19b2..bb1a6f1e531a 100644 --- a/pkg/kv/kvserver/concurrency/concurrency_manager_test.go +++ b/pkg/kv/kvserver/concurrency/concurrency_manager_test.go @@ -287,7 +287,7 @@ func TestConcurrencyManagerBasic(t *testing.T) { opName := fmt.Sprintf("finish %s", reqName) mon.runSync(opName, func(ctx context.Context) { log.Event(ctx, "finishing request") - m.FinishReq(guard) + m.FinishReq(ctx, guard) c.mu.Lock() delete(c.guardsByReqName, reqName) c.mu.Unlock() diff --git a/pkg/kv/kvserver/concurrency/latch_manager.go b/pkg/kv/kvserver/concurrency/latch_manager.go index 482aedba2877..929c625aee46 100644 --- a/pkg/kv/kvserver/concurrency/latch_manager.go +++ b/pkg/kv/kvserver/concurrency/latch_manager.go @@ -66,8 +66,8 @@ func (m *latchManagerImpl) Poison(lg latchGuard) { m.m.Poison(lg.(*spanlatch.Guard)) } -func (m *latchManagerImpl) Release(lg latchGuard) { - m.m.Release(lg.(*spanlatch.Guard)) +func (m *latchManagerImpl) Release(ctx context.Context, lg latchGuard) { + m.m.Release(ctx, lg.(*spanlatch.Guard)) } func (m *latchManagerImpl) Metrics() LatchMetrics { diff --git a/pkg/kv/kvserver/concurrency/lock_table_test.go b/pkg/kv/kvserver/concurrency/lock_table_test.go index c959c7e9950f..5b1d2659cf35 100644 --- a/pkg/kv/kvserver/concurrency/lock_table_test.go +++ b/pkg/kv/kvserver/concurrency/lock_table_test.go @@ -1109,7 +1109,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { var g lockTableGuard defer func() { if lg != nil { - e.lm.Release(lg) + e.lm.Release(ctx, lg) lg = nil } if g != nil { @@ -1135,7 +1135,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error { if !g.ShouldWait() { break } - e.lm.Release(lg) + e.lm.Release(ctx, lg) lg = nil var lastID uuid.UUID L: @@ -1709,6 +1709,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { var g lockTableGuard var err error firstIter := true + ctx := context.Background() for { if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil { doneCh <- err @@ -1728,7 +1729,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { atomic.AddUint64(env.numRequestsWaited, 1) firstIter = false } - env.lm.Release(lg) + env.lm.Release(ctx, lg) for { <-g.NewStateChan() state, err := g.CurState() @@ -1751,7 +1752,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { } } env.lt.Dequeue(g) - env.lm.Release(lg) + env.lm.Release(ctx, lg) if len(item.locksToAcquire) == 0 { doneCh <- nil return @@ -1772,7 +1773,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) { return } } - env.lm.Release(lg) + env.lm.Release(ctx, lg) doneCh <- nil } diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 743162b9138a..7038e803b412 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -431,7 +431,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( defer func() { // NB: wrapped to delay g evaluation to its value when returning. if g != nil { - r.concMgr.FinishReq(g) + r.concMgr.FinishReq(ctx, g) } }() pp := poison.Policy_Error @@ -531,7 +531,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries( if reuseLatchAndLockSpans { latchSpans, lockSpans = g.TakeSpanSets() } - r.concMgr.FinishReq(g) + r.concMgr.FinishReq(ctx, g) g = nil } } @@ -1365,6 +1365,6 @@ func (ec *endCmds) done( // this method is called and the Guard is not set. Consider removing this // check and upgrading the previous observation to an invariant. if ec.g != nil { - ec.repl.concMgr.FinishReq(ec.g) + ec.repl.concMgr.FinishReq(ctx, ec.g) } } diff --git a/pkg/kv/kvserver/spanlatch/manager.go b/pkg/kv/kvserver/spanlatch/manager.go index b92cc8534e5d..973805d1f765 100644 --- a/pkg/kv/kvserver/spanlatch/manager.go +++ b/pkg/kv/kvserver/spanlatch/manager.go @@ -242,7 +242,7 @@ func (m *Manager) Acquire( err := m.wait(ctx, lg, snap) if err != nil { - m.Release(lg) + m.Release(ctx, lg) return nil, err } return lg, nil @@ -362,7 +362,7 @@ func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, err }() err := m.wait(ctx, lg, *lg.snap) if err != nil { - m.Release(lg) + m.Release(ctx, lg) return nil, err } return lg, nil @@ -623,7 +623,7 @@ func (m *Manager) Poison(lg *Guard) { // Release releases the latches held by the provided Guard. After being called, // dependent latch acquisition attempts can complete if not blocked on any other // owned latches. -func (m *Manager) Release(lg *Guard) { +func (m *Manager) Release(ctx context.Context, lg *Guard) { lg.done.signal() if lg.snap != nil { lg.snap.close() @@ -637,9 +637,9 @@ func (m *Manager) Release(lg *Guard) { const longLatchHeldMsg = "%s has held latch for %d ns. Some possible causes are " + "slow disk reads, slow raft replication, and expensive request processing." if m.everySecondLogger.ShouldLog() { - log.Warningf(context.Background(), longLatchHeldMsg, lg.baFmt, held) + log.Warningf(ctx, longLatchHeldMsg, lg.baFmt, held) } else { - log.VEventf(context.Background(), 2, longLatchHeldMsg, lg.baFmt, held) + log.VEventf(ctx, 2, longLatchHeldMsg, lg.baFmt, held) } } } diff --git a/pkg/kv/kvserver/spanlatch/manager_test.go b/pkg/kv/kvserver/spanlatch/manager_test.go index abac3c4a8d24..98130ee246fb 100644 --- a/pkg/kv/kvserver/spanlatch/manager_test.go +++ b/pkg/kv/kvserver/spanlatch/manager_test.go @@ -129,7 +129,7 @@ func (m *Manager) MustAcquireChExt( go func() { err := m.wait(ctx, lg, snap) if err != nil { - m.Release(lg) + m.Release(ctx, lg) } errCh <- err }() @@ -140,12 +140,13 @@ func TestLatchManager(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager + ctx := context.Background() // Try latches with no overlapping already-acquired latches. lg1 := m.MustAcquire(spans("a", "", write, zeroTS)) - m.Release(lg1) + m.Release(ctx, lg1) lg2 := m.MustAcquire(spans("a", "b", write, zeroTS)) - m.Release(lg2) + m.Release(ctx, lg2) // Add a latch and verify overlapping latches wait on it. lg3 := m.MustAcquire(spans("a", "b", write, zeroTS)) @@ -155,14 +156,14 @@ func TestLatchManager(t *testing.T) { testLatchBlocks(t, lg4C) // First write completes, second grabs latch. - m.Release(lg3) + m.Release(ctx, lg3) testLatchSucceeds(t, lg4C) } func TestLatchManagerAcquireOverlappingSpans(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager - + ctx := context.Background() // Acquire overlapping latches with different access patterns. // |----------| <- Read latch [a-c)@t1 // |----------| <- Write latch [b-d)@t1 @@ -179,7 +180,7 @@ func TestLatchManagerAcquireOverlappingSpans(t *testing.T) { lg2C := m.MustAcquireCh(spans("a", "b", read, ts0)) lg2 := testLatchSucceeds(t, lg2C) - m.Release(lg2) + m.Release(ctx, lg2) // We acquire reads at lower timestamps than writes to check for blocked // acquisitions based on the original latch, not the latches declared in @@ -195,11 +196,11 @@ func TestLatchManagerAcquireOverlappingSpans(t *testing.T) { testLatchBlocks(t, lgC) } - m.Release(lg1) + m.Release(ctx, lg1) for _, lgC := range attempts { lg := testLatchSucceeds(t, lgC) - m.Release(lg) + m.Release(ctx, lg) } } @@ -207,6 +208,7 @@ func TestLatchManagerAcquiringReadsVaryingTimestamps(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager + ctx := context.Background() var ts0, ts1 = hlc.Timestamp{WallTime: 0}, hlc.Timestamp{WallTime: 1} var spanSet spanset.SpanSet add(&spanSet, "a", "", read, ts0) @@ -216,7 +218,7 @@ func TestLatchManagerAcquiringReadsVaryingTimestamps(t *testing.T) { for _, walltime := range []int64{0, 1, 2} { ts := hlc.Timestamp{WallTime: walltime} lg := testLatchSucceeds(t, m.MustAcquireCh(spans("a", "", read, ts))) - m.Release(lg) + m.Release(ctx, lg) } var attempts []Attempt @@ -229,11 +231,11 @@ func TestLatchManagerAcquiringReadsVaryingTimestamps(t *testing.T) { testLatchBlocks(t, lgC) } - m.Release(lg1) + m.Release(ctx, lg1) for _, lgC := range attempts { lg := testLatchSucceeds(t, lgC) - m.Release(lg) + m.Release(ctx, lg) } } @@ -252,6 +254,7 @@ func TestLatchManagerWriteWaitForMultipleReads(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager + ctx := context.Background() // Acquire latch for read-only span. lg1 := m.MustAcquire(spans("a", "", read, zeroTS)) // Acquire another one on top. @@ -264,13 +267,13 @@ func TestLatchManagerWriteWaitForMultipleReads(t *testing.T) { testLatchBlocks(t, lg3C) // The second read releases latch, but the first one remains. - m.Release(lg2) + m.Release(ctx, lg2) // Should still block. testLatchBlocks(t, lg3C) // First read releases latch. - m.Release(lg1) + m.Release(ctx, lg1) // Now it goes through. testLatchSucceeds(t, lg3C) @@ -280,6 +283,7 @@ func TestLatchManagerMultipleOverlappingLatches(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager + ctx := context.Background() // Acquire multiple latches. a1 := m.MustAcquireCh(spans("a", "", write, zeroTS)) a2 := m.MustAcquireCh(spans("b", "c", write, zeroTS)) @@ -288,11 +292,11 @@ func TestLatchManagerMultipleOverlappingLatches(t *testing.T) { // Attempt to acquire latch which overlaps them all. lg4C := m.MustAcquireCh(spans("0", "z", write, zeroTS)) testLatchBlocks(t, lg4C) - m.Release(testLatchSucceeds(t, a1)) + m.Release(ctx, testLatchSucceeds(t, a1)) testLatchBlocks(t, lg4C) - m.Release(testLatchSucceeds(t, a2)) + m.Release(ctx, testLatchSucceeds(t, a2)) testLatchBlocks(t, lg4C) - m.Release(testLatchSucceeds(t, a3)) + m.Release(ctx, testLatchSucceeds(t, a3)) testLatchSucceeds(t, lg4C) } @@ -300,6 +304,7 @@ func TestLatchManagerMultipleOverlappingSpans(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager + ctx := context.Background() // Acquire multiple latches. lg1 := m.MustAcquire(spans("a", "", write, zeroTS)) lg2 := m.MustAcquire(spans("b", "c", read, zeroTS)) @@ -315,14 +320,14 @@ func TestLatchManagerMultipleOverlappingSpans(t *testing.T) { // Blocks until the first three prerequisite latches release. testLatchBlocks(t, lg5C) - m.Release(lg2) + m.Release(ctx, lg2) testLatchBlocks(t, lg5C) - m.Release(lg3) + m.Release(ctx, lg3) testLatchBlocks(t, lg5C) - m.Release(lg1) + m.Release(ctx, lg1) lg5 := testLatchSucceeds(t, lg5C) - m.Release(lg4) - m.Release(lg5) + m.Release(ctx, lg4) + m.Release(ctx, lg5) } func TestLatchManagerDependentLatches(t *testing.T) { @@ -497,6 +502,7 @@ func TestLatchManagerDependentLatches(t *testing.T) { dependent: false, }, } + ctx := context.Background() for _, c := range cases { t.Run(c.name, func(t *testing.T) { testutils.RunTrueAndFalse(t, "inv", func(t *testing.T, inv bool) { @@ -510,13 +516,13 @@ func TestLatchManagerDependentLatches(t *testing.T) { lg2C := m.MustAcquireCh(c.sp2) if c.dependent { testLatchBlocks(t, lg2C) - m.Release(lg1) + m.Release(ctx, lg1) lg2 := testLatchSucceeds(t, lg2C) - m.Release(lg2) + m.Release(ctx, lg2) } else { lg2 := testLatchSucceeds(t, lg2C) - m.Release(lg1) - m.Release(lg2) + m.Release(ctx, lg1) + m.Release(ctx, lg2) } }) }) @@ -573,9 +579,9 @@ func TestLatchManagerPoison(t *testing.T) { // Release ga1, which allows ga4 to sequence. At that point, we can check // directly that it is poisoned. - m.Release(ga1) + m.Release(ctx, ga1) ga4 := testLatchSucceeds(t, a4) - m.Release(ga4) + m.Release(ctx, ga4) } func TestLatchManagerContextCancellation(t *testing.T) { @@ -601,7 +607,7 @@ func TestLatchManagerContextCancellation(t *testing.T) { testLatchBlocks(t, a3C) // Release the first latch. The third succeeds in acquiring the latch. - m.Release(lg1) + m.Release(ctx2, lg1) testLatchSucceeds(t, a3C) } @@ -609,10 +615,11 @@ func TestLatchManagerOptimistic(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager + ctx := context.Background() // Acquire latches, no conflict. lg1 := m.AcquireOptimistic(spans("d", "f", write, zeroTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg1, spans("d", "f", write, zeroTS)), poison.Policy_Error) - lg1, err := m.WaitUntilAcquired(context.Background(), lg1) + lg1, err := m.WaitUntilAcquired(ctx, lg1) require.NoError(t, err) // Optimistic acquire encounters conflict in some cases. @@ -622,25 +629,25 @@ func TestLatchManagerOptimistic(t *testing.T) { waitUntilAcquiredCh := func(g *Guard) Attempt { errCh := make(chan error, 1) go func() { - _, err := m.WaitUntilAcquired(context.Background(), g) + _, err := m.WaitUntilAcquired(ctx, g) errCh <- err }() return Attempt{lg: g, errCh: errCh} } a2 := waitUntilAcquiredCh(lg2) testLatchBlocks(t, a2) - m.Release(lg1) + m.Release(ctx, lg1) testLatchSucceeds(t, a2) // Optimistic acquire encounters conflict. lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error, nil) require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) - m.Release(lg2) + m.Release(ctx, lg2) // There is still a conflict even though lg2 has been released. require.False(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) - lg3, err = m.WaitUntilAcquired(context.Background(), lg3) + lg3, err = m.WaitUntilAcquired(ctx, lg3) require.NoError(t, err) - m.Release(lg3) + m.Release(ctx, lg3) // Optimistic acquire for read below write encounters no conflict. oneTS, twoTS := hlc.Timestamp{WallTime: 1}, hlc.Timestamp{WallTime: 2} @@ -648,40 +655,41 @@ func TestLatchManagerOptimistic(t *testing.T) { lg5 := m.AcquireOptimistic(spans("a", "e", read, oneTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "e", read, oneTS))) require.True(t, m.CheckOptimisticNoConflicts(lg5, spans("a", "c", read, oneTS))) - lg5, err = m.WaitUntilAcquired(context.Background(), lg5) + lg5, err = m.WaitUntilAcquired(ctx, lg5) require.NoError(t, err) - m.Release(lg5) - m.Release(lg4) + m.Release(ctx, lg5) + m.Release(ctx, lg4) } func TestLatchManagerWaitFor(t *testing.T) { defer leaktest.AfterTest(t)() var m Manager + ctx := context.Background() // Acquire latches, no conflict. - lg1, err := m.Acquire(context.Background(), spans("d", "f", write, zeroTS), poison.Policy_Error, nil) + lg1, err := m.Acquire(ctx, spans("d", "f", write, zeroTS), poison.Policy_Error, nil) require.NoError(t, err) // See if WaitFor waits for above latch. waitForCh := func() Attempt { errCh := make(chan error) go func() { - errCh <- m.WaitFor(context.Background(), spans("a", "e", read, zeroTS), poison.Policy_Error, nil) + errCh <- m.WaitFor(ctx, spans("a", "e", read, zeroTS), poison.Policy_Error, nil) }() return Attempt{errCh: errCh} } ch2 := waitForCh() testLatchBlocks(t, ch2) - m.Release(lg1) + m.Release(ctx, lg1) testLatchSucceeds(t, ch2) // Optimistic acquire should _not_ encounter conflict - as WaitFor should // not lay any latches. lg3 := m.AcquireOptimistic(spans("a", "e", write, zeroTS), poison.Policy_Error, nil) require.True(t, m.CheckOptimisticNoConflicts(lg3, spans("a", "e", write, zeroTS))) - lg3, err = m.WaitUntilAcquired(context.Background(), lg3) + lg3, err = m.WaitUntilAcquired(ctx, lg3) require.NoError(t, err) - m.Release(lg3) + m.Release(ctx, lg3) } func BenchmarkLatchManagerReadOnlyMix(b *testing.B) { @@ -702,6 +710,7 @@ func BenchmarkLatchManagerReadOnlyMix(b *testing.B) { } func BenchmarkLatchManagerReadWriteMix(b *testing.B) { + ctx := context.Background() for _, readsPerWrite := range []int{0, 1, 4, 16, 64, 128, 256} { b.Run(fmt.Sprintf("readsPerWrite=%d", readsPerWrite), func(b *testing.B) { var m Manager @@ -728,7 +737,7 @@ func BenchmarkLatchManagerReadWriteMix(b *testing.B) { lg, snap := m.sequence(&spans[i], poison.Policy_Error, nil) snap.close() if len(lgBuf) == cap(lgBuf) { - m.Release(<-lgBuf) + m.Release(ctx, <-lgBuf) } lgBuf <- lg }