Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kvflowcontrol: performance optimizations in tracker.go #110503

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 47 additions & 25 deletions pkg/kv/kvserver/kvflowcontrol/kvflowtokentracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"
"sort"
"strings"
"sync"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
Expand All @@ -29,11 +30,7 @@ import (
// admissionpb.WorkPriority, for replication along an individual
// kvflowcontrol.Stream.
type Tracker struct {
// TODO(irfansharif,aaditya): Everytime we track something, we incur a map
// assignment (shows up in CPU profiles). We could introduce a struct that
// internally embeds this list of tracked deductions, and append there
// instead. Do this as part of #104154.
trackedM map[admissionpb.WorkPriority][]tracked
trackedM map[admissionpb.WorkPriority]*trackedList

// lowerBound tracks on a per-stream basis the log position below which
// we ignore token deductions.
Expand All @@ -44,6 +41,24 @@ type Tracker struct {
knobs *kvflowcontrol.TestingKnobs
}

var trackedPool sync.Pool = sync.Pool{
New: func() interface{} {
return new(tracked)
},
}

func newTracked(tokens kvflowcontrol.Tokens, position kvflowcontrolpb.RaftLogPosition) *tracked {
t := trackedPool.Get().(*tracked)
t.tokens = tokens
t.position = position
return t
}

func freeTracked(t *tracked) {
*t = tracked{}
trackedPool.Put(t)
}

// tracked represents tracked flow tokens; they're tracked with respect to a
// raft log position (typically where the proposed command is expected to end
// up).
Expand All @@ -52,6 +67,10 @@ type tracked struct {
position kvflowcontrolpb.RaftLogPosition
}

type trackedList struct {
items []*tracked
}

// New constructs a new Tracker with the given lower bound raft log position
// (below which we're not allowed to deduct tokens).
func New(
Expand All @@ -63,7 +82,7 @@ func New(
knobs = &kvflowcontrol.TestingKnobs{}
}
return &Tracker{
trackedM: make(map[admissionpb.WorkPriority][]tracked),
trackedM: make(map[admissionpb.WorkPriority]*trackedList),
lowerBound: lb,
knobs: knobs,
stream: stream,
Expand Down Expand Up @@ -102,8 +121,12 @@ func (dt *Tracker) Track(
}
dt.lowerBound = pos

if len(dt.trackedM[pri]) >= 1 {
last := dt.trackedM[pri][len(dt.trackedM[pri])-1]
if _, ok := dt.trackedM[pri]; !ok {
dt.trackedM[pri] = &trackedList{}
}

if len(dt.trackedM[pri].items) >= 1 {
last := dt.trackedM[pri].items[len(dt.trackedM[pri].items)-1]
if !last.position.Less(pos) {
logFn := log.Errorf
if buildutil.CrdbTestBuild {
Expand All @@ -115,13 +138,7 @@ func (dt *Tracker) Track(
}
}

// TODO(irfansharif,aaditya): The tracked instances here make up about ~0.4%
// of allocations under kv0/enc=false/nodes=3/cpu=9. Maybe clean it up as
// part of #104154, by using a sync.Pool perhaps.
dt.trackedM[pri] = append(dt.trackedM[pri], tracked{
tokens: tokens,
position: pos,
})
dt.trackedM[pri].items = append(dt.trackedM[pri].items, newTracked(tokens, pos))
if log.V(1) {
log.Infof(ctx, "tracking %s flow control tokens for pri=%s stream=%s pos=%s",
tokens, pri, dt.stream, pos)
Expand All @@ -144,11 +161,11 @@ func (dt *Tracker) Untrack(
var untracked int
var tokens kvflowcontrol.Tokens
for {
if untracked == len(dt.trackedM[pri]) {
if untracked == len(dt.trackedM[pri].items) {
break
}

deduction := dt.trackedM[pri][untracked]
deduction := dt.trackedM[pri].items[untracked]
if !deduction.position.LessEq(upto) {
break
}
Expand All @@ -161,17 +178,22 @@ func (dt *Tracker) Untrack(
tokens += deduction.tokens
}

trackedBefore := len(dt.trackedM[pri])
dt.trackedM[pri] = dt.trackedM[pri][untracked:]
trackedBefore := len(dt.trackedM[pri].items)

// Free up untracked items.
for i := 0; i < untracked; i++ {
freeTracked(dt.trackedM[pri].items[i])
}
dt.trackedM[pri].items = dt.trackedM[pri].items[untracked:]
if log.V(1) {
remaining := ""
if len(dt.trackedM[pri]) > 0 {
remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri][0].tokens)
if len(dt.trackedM[pri].items) > 0 {
remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri].items[0].tokens)
}
log.Infof(ctx, "released %s flow control tokens for %d out of %d tracked deductions for pri=%s stream=%s, up to %s; %d tracked deduction(s) remain%s",
tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri]), remaining)
tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri].items), remaining)
}
if len(dt.trackedM[pri]) == 0 {
if len(dt.trackedM[pri].items) == 0 {
delete(dt.trackedM, pri)
}

Expand All @@ -186,7 +208,7 @@ func (dt *Tracker) Untrack(
func (dt *Tracker) Iter(_ context.Context, f func(admissionpb.WorkPriority, kvflowcontrol.Tokens)) {
for pri, deductions := range dt.trackedM {
var tokens kvflowcontrol.Tokens
for _, deduction := range deductions {
for _, deduction := range deductions.items {
tokens += deduction.tokens
}
f(pri, tokens)
Expand Down Expand Up @@ -229,7 +251,7 @@ func (dt *Tracker) TestingIter(
f func(admissionpb.WorkPriority, kvflowcontrol.Tokens, kvflowcontrolpb.RaftLogPosition) bool,
) {
for pri, deductions := range dt.trackedM {
for _, deduction := range deductions {
for _, deduction := range deductions.items {
if !f(pri, deduction.tokens, deduction.position) {
return
}
Expand Down