Skip to content

Commit

Permalink
kv: supply context to manager release
Browse files Browse the repository at this point in the history
This commit adds context from concurrency manager's finishReq method to latch
manager's release method. The goal is to supply the right context when logging
events on latch release.

Informs: cockroachdb#114609

Release note: None
  • Loading branch information
lyang24 authored and wenyihu6 committed Feb 21, 2024
1 parent 30d6a4d commit abce40d
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 68 deletions.
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ type RequestSequencer interface {
// the request had against conflicting requests and allowing conflicting
// requests that are blocked on this one to proceed. The guard should not
// be used after being released.
FinishReq(*Guard)
FinishReq(context.Context, *Guard)
}

// ContentionHandler is concerned with handling contention-related errors. This
Expand Down Expand Up @@ -521,7 +521,7 @@ type latchManager interface {
Poison(latchGuard)

// Release a guard's latches, relinquish its protection from conflicting requests.
Release(latchGuard)
Release(ctx context.Context, lg latchGuard)

// Metrics returns information about the state of the latchManager.
Metrics() LatchMetrics
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/concurrency/concurrency_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (m *managerImpl) SequenceReq(
resp, err := m.sequenceReqWithGuard(ctx, g, branch)
if resp != nil || err != nil {
// Ensure that we release the guard if we return a response or an error.
m.FinishReq(g)
m.FinishReq(ctx, g)
return nil, resp, err
}
return g, nil, nil
Expand Down Expand Up @@ -345,7 +345,7 @@ func (m *managerImpl) sequenceReqWithGuard(
// true if ScanOptimistic was called above. Therefore it will also never
// be true if latchManager.AcquireOptimistic was called.
if g.ltg.ShouldWait() {
m.lm.Release(g.moveLatchGuard())
m.lm.Release(ctx, g.moveLatchGuard())

log.Event(ctx, "waiting in lock wait-queues")
if err := m.ltw.WaitOn(ctx, g.Req, g.ltg); err != nil {
Expand Down Expand Up @@ -429,7 +429,7 @@ func (m *managerImpl) PoisonReq(g *Guard) {
}

// FinishReq implements the RequestSequencer interface.
func (m *managerImpl) FinishReq(g *Guard) {
func (m *managerImpl) FinishReq(ctx context.Context, g *Guard) {
// NOTE: we release latches _before_ exiting lock wait-queues deliberately.
// Either order would be correct, but the order here avoids non-determinism in
// cases where a request A holds both latches and has claimed some keys by
Expand All @@ -448,7 +448,7 @@ func (m *managerImpl) FinishReq(g *Guard) {
// signaler wakes up (if anyone) will never bump into its mutex immediately
// upon resumption.
if lg := g.moveLatchGuard(); lg != nil {
m.lm.Release(lg)
m.lm.Release(ctx, lg)
}
if ltg := g.moveLockTableGuard(); ltg != nil {
m.lt.Dequeue(ltg)
Expand Down Expand Up @@ -504,13 +504,13 @@ func (m *managerImpl) HandleLockConflictError(
// not releasing lockWaitQueueGuards. We expect the caller of this method to
// then re-sequence the Request by calling SequenceReq with the un-latched
// Guard. This is analogous to iterating through the loop in SequenceReq.
m.lm.Release(g.moveLatchGuard())
m.lm.Release(ctx, g.moveLatchGuard())

// If the discovery process collected a set of intents to resolve before the
// next evaluation attempt, do so.
if toResolve := g.ltg.ResolveBeforeScanning(); len(toResolve) > 0 {
if err := m.ltw.ResolveDeferredIntents(ctx, g.Req.AdmissionHeader, toResolve); err != nil {
m.FinishReq(g)
m.FinishReq(ctx, g)
return nil, err
}
}
Expand All @@ -529,7 +529,7 @@ func (m *managerImpl) HandleTransactionPushError(
// caller of this method to then re-sequence the Request by calling
// SequenceReq with the un-latched Guard. This is analogous to iterating
// through the loop in SequenceReq.
m.lm.Release(g.moveLatchGuard())
m.lm.Release(ctx, g.moveLatchGuard())
return g
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestConcurrencyManagerBasic(t *testing.T) {
opName := fmt.Sprintf("finish %s", reqName)
mon.runSync(opName, func(ctx context.Context) {
log.Event(ctx, "finishing request")
m.FinishReq(guard)
m.FinishReq(ctx, guard)
c.mu.Lock()
delete(c.guardsByReqName, reqName)
c.mu.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/concurrency/latch_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (m *latchManagerImpl) Poison(lg latchGuard) {
m.m.Poison(lg.(*spanlatch.Guard))
}

func (m *latchManagerImpl) Release(lg latchGuard) {
m.m.Release(lg.(*spanlatch.Guard))
func (m *latchManagerImpl) Release(ctx context.Context, lg latchGuard) {
m.m.Release(ctx, lg.(*spanlatch.Guard))
}

func (m *latchManagerImpl) Metrics() LatchMetrics {
Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/concurrency/lock_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
var g lockTableGuard
defer func() {
if lg != nil {
e.lm.Release(lg)
e.lm.Release(ctx, lg)
lg = nil
}
if g != nil {
Expand All @@ -1135,7 +1135,7 @@ func doWork(ctx context.Context, item *workItem, e *workloadExecutor) error {
if !g.ShouldWait() {
break
}
e.lm.Release(lg)
e.lm.Release(ctx, lg)
lg = nil
var lastID uuid.UUID
L:
Expand Down Expand Up @@ -1709,6 +1709,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) {
var g lockTableGuard
var err error
firstIter := true
ctx := context.Background()
for {
if lg, err = env.lm.Acquire(context.Background(), item.LatchSpans, poison.Policy_Error, item.BaFmt); err != nil {
doneCh <- err
Expand All @@ -1728,7 +1729,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) {
atomic.AddUint64(env.numRequestsWaited, 1)
firstIter = false
}
env.lm.Release(lg)
env.lm.Release(ctx, lg)
for {
<-g.NewStateChan()
state, err := g.CurState()
Expand All @@ -1751,7 +1752,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) {
}
}
env.lt.Dequeue(g)
env.lm.Release(lg)
env.lm.Release(ctx, lg)
if len(item.locksToAcquire) == 0 {
doneCh <- nil
return
Expand All @@ -1772,7 +1773,7 @@ func doBenchWork(item *benchWorkItem, env benchEnv, doneCh chan<- error) {
return
}
}
env.lm.Release(lg)
env.lm.Release(ctx, lg)
doneCh <- nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
defer func() {
// NB: wrapped to delay g evaluation to its value when returning.
if g != nil {
r.concMgr.FinishReq(g)
r.concMgr.FinishReq(ctx, g)
}
}()
pp := poison.Policy_Error
Expand Down Expand Up @@ -531,7 +531,7 @@ func (r *Replica) executeBatchWithConcurrencyRetries(
if reuseLatchAndLockSpans {
latchSpans, lockSpans = g.TakeSpanSets()
}
r.concMgr.FinishReq(g)
r.concMgr.FinishReq(ctx, g)
g = nil
}
}
Expand Down Expand Up @@ -1365,6 +1365,6 @@ func (ec *endCmds) done(
// this method is called and the Guard is not set. Consider removing this
// check and upgrading the previous observation to an invariant.
if ec.g != nil {
ec.repl.concMgr.FinishReq(ec.g)
ec.repl.concMgr.FinishReq(ctx, ec.g)
}
}
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/spanlatch/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func (m *Manager) Acquire(

err := m.wait(ctx, lg, snap)
if err != nil {
m.Release(lg)
m.Release(ctx, lg)
return nil, err
}
return lg, nil
Expand Down Expand Up @@ -362,7 +362,7 @@ func (m *Manager) WaitUntilAcquired(ctx context.Context, lg *Guard) (*Guard, err
}()
err := m.wait(ctx, lg, *lg.snap)
if err != nil {
m.Release(lg)
m.Release(ctx, lg)
return nil, err
}
return lg, nil
Expand Down Expand Up @@ -623,7 +623,7 @@ func (m *Manager) Poison(lg *Guard) {
// Release releases the latches held by the provided Guard. After being called,
// dependent latch acquisition attempts can complete if not blocked on any other
// owned latches.
func (m *Manager) Release(lg *Guard) {
func (m *Manager) Release(ctx context.Context, lg *Guard) {
lg.done.signal()
if lg.snap != nil {
lg.snap.close()
Expand All @@ -637,9 +637,9 @@ func (m *Manager) Release(lg *Guard) {
const longLatchHeldMsg = "%s has held latch for %d ns. Some possible causes are " +
"slow disk reads, slow raft replication, and expensive request processing."
if m.everySecondLogger.ShouldLog() {
log.Warningf(context.Background(), longLatchHeldMsg, lg.baFmt, held)
log.Warningf(ctx, longLatchHeldMsg, lg.baFmt, held)
} else {
log.VEventf(context.Background(), 2, longLatchHeldMsg, lg.baFmt, held)
log.VEventf(ctx, 2, longLatchHeldMsg, lg.baFmt, held)
}
}
}
Expand Down
Loading

0 comments on commit abce40d

Please sign in to comment.