From f5bca80a363c9eae5e6ee6e4d93ca21dbb0c1459 Mon Sep 17 00:00:00 2001 From: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> Date: Thu, 14 Sep 2023 15:50:36 -0400 Subject: [PATCH] kvflowcontrol: performance optimizations in tracker.go This patch makes 2 main performance optimizations in the tracker codepath: 1. Replace the trackedList with a struct that points to a list. This avoids map assignments each time anything is appended to the list. 2. We use a sync.Pool for tracked items to avoid frequent allocations and gc for the objects. Informs #104154. Release note: None --- .../kvflowtokentracker/tracker.go | 72 ++++++++++++------- 1 file changed, 47 insertions(+), 25 deletions(-) diff --git a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go index 1cfc39afc235..113807dac75e 100644 --- a/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go +++ b/pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go @@ -15,6 +15,7 @@ import ( "fmt" "sort" "strings" + "sync" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb" @@ -29,11 +30,7 @@ import ( // admissionpb.WorkPriority, for replication along an individual // kvflowcontrol.Stream. type Tracker struct { - // TODO(irfansharif,aaditya): Everytime we track something, we incur a map - // assignment (shows up in CPU profiles). We could introduce a struct that - // internally embeds this list of tracked deductions, and append there - // instead. Do this as part of #104154. - trackedM map[admissionpb.WorkPriority][]tracked + trackedM map[admissionpb.WorkPriority]*trackedList // lowerBound tracks on a per-stream basis the log position below which // we ignore token deductions. @@ -44,6 +41,24 @@ type Tracker struct { knobs *kvflowcontrol.TestingKnobs } +var trackedPool sync.Pool = sync.Pool{ + New: func() interface{} { + return new(tracked) + }, +} + +func newTracked(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) *tracked { + t := trackedPool.Get().(*tracked) + t.tokens = tokens + t.position = position + return t +} + +func freeTracked(t *tracked) { + *t = tracked{} + trackedPool.Put(t) +} + // tracked represents tracked flow tokens; they're tracked with respect to a // raft log position (typically where the proposed command is expected to end // up). @@ -52,6 +67,10 @@ type tracked struct { position kvflowcontrolpb.RaftLogPosition } +type trackedList struct { + items []*tracked +} + // New constructs a new Tracker with the given lower bound raft log position // (below which we're not allowed to deduct tokens). func New( @@ -63,7 +82,7 @@ func New( knobs = &kvflowcontrol.TestingKnobs{} } return &Tracker{ - trackedM: make(map[admissionpb.WorkPriority][]tracked), + trackedM: make(map[admissionpb.WorkPriority]*trackedList), lowerBound: lb, knobs: knobs, stream: stream, @@ -102,8 +121,12 @@ func (dt *Tracker) Track( } dt.lowerBound = pos - if len(dt.trackedM[pri]) >= 1 { - last := dt.trackedM[pri][len(dt.trackedM[pri])-1] + if _, ok := dt.trackedM[pri]; !ok { + dt.trackedM[pri] = &trackedList{} + } + + if len(dt.trackedM[pri].items) >= 1 { + last := dt.trackedM[pri].items[len(dt.trackedM[pri].items)-1] if !last.position.Less(pos) { logFn := log.Errorf if buildutil.CrdbTestBuild { @@ -115,13 +138,7 @@ func (dt *Tracker) Track( } } - // TODO(irfansharif,aaditya): The tracked instances here make up about ~0.4% - // of allocations under kv0/enc=false/nodes=3/cpu=9. Maybe clean it up as - // part of #104154, by using a sync.Pool perhaps. - dt.trackedM[pri] = append(dt.trackedM[pri], tracked{ - tokens: tokens, - position: pos, - }) + dt.trackedM[pri].items = append(dt.trackedM[pri].items, newTracked(tokens, pos)) if log.V(1) { log.Infof(ctx, "tracking %s flow control tokens for pri=%s stream=%s pos=%s", tokens, pri, dt.stream, pos) @@ -144,11 +161,11 @@ func (dt *Tracker) Untrack( var untracked int var tokens kvflowcontrol.Tokens for { - if untracked == len(dt.trackedM[pri]) { + if untracked == len(dt.trackedM[pri].items) { break } - deduction := dt.trackedM[pri][untracked] + deduction := dt.trackedM[pri].items[untracked] if !deduction.position.LessEq(upto) { break } @@ -161,17 +178,22 @@ func (dt *Tracker) Untrack( tokens += deduction.tokens } - trackedBefore := len(dt.trackedM[pri]) - dt.trackedM[pri] = dt.trackedM[pri][untracked:] + trackedBefore := len(dt.trackedM[pri].items) + + // Free up untracked items. + for i := 0; i < untracked; i++ { + freeTracked(dt.trackedM[pri].items[i]) + } + dt.trackedM[pri].items = dt.trackedM[pri].items[untracked:] if log.V(1) { remaining := "" - if len(dt.trackedM[pri]) > 0 { - remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri][0].tokens) + if len(dt.trackedM[pri].items) > 0 { + remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri].items[0].tokens) } log.Infof(ctx, "released %s flow control tokens for %d out of %d tracked deductions for pri=%s stream=%s, up to %s; %d tracked deduction(s) remain%s", - tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri]), remaining) + tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri].items), remaining) } - if len(dt.trackedM[pri]) == 0 { + if len(dt.trackedM[pri].items) == 0 { delete(dt.trackedM, pri) } @@ -186,7 +208,7 @@ func (dt *Tracker) Untrack( func (dt *Tracker) Iter(_ context.Context, f func(admissionpb.WorkPriority, kvflowcontrol.Tokens)) { for pri, deductions := range dt.trackedM { var tokens kvflowcontrol.Tokens - for _, deduction := range deductions { + for _, deduction := range deductions.items { tokens += deduction.tokens } f(pri, tokens) @@ -229,7 +251,7 @@ func (dt *Tracker) TestingIter( f func(admissionpb.WorkPriority, kvflowcontrol.Tokens, kvflowcontrolpb.RaftLogPosition) bool, ) { for pri, deductions := range dt.trackedM { - for _, deduction := range deductions { + for _, deduction := range deductions.items { if !f(pri, deduction.tokens, deduction.position) { return }