-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathtracker.go
268 lines (244 loc) · 8.44 KB
/
tracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvflowtokentracker
import (
"context"
"fmt"
"sort"
"strings"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowcontrolpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvflowcontrol/kvflowinspectpb"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
)
// Tracker tracks flow token deductions. Tokens are deducted for proposed
// commands (identified by raft log position), with a given
// admissionpb.WorkPriority, for replication along an individual
// kvflowcontrol.Stream.
type Tracker struct {
trackedM map[admissionpb.WorkPriority][]tracked
// lowerBound tracks on a per-stream basis the log position below which
// we ignore token deductions.
lowerBound kvflowcontrolpb.RaftLogPosition
stream kvflowcontrol.Stream // used for logging only
knobs *kvflowcontrol.TestingKnobs
}
// tracked represents tracked flow tokens; they're tracked with respect to a
// raft log position (typically where the proposed command is expected to end
// up).
type tracked struct {
tokens kvflowcontrol.Tokens
position kvflowcontrolpb.RaftLogPosition
}
// New constructs a new Tracker with the given lower bound raft log position
// (below which we're not allowed to deduct tokens).
func New(
lb kvflowcontrolpb.RaftLogPosition,
stream kvflowcontrol.Stream,
knobs *kvflowcontrol.TestingKnobs,
) *Tracker {
if knobs == nil {
knobs = &kvflowcontrol.TestingKnobs{}
}
return &Tracker{
trackedM: make(map[admissionpb.WorkPriority][]tracked),
lowerBound: lb,
knobs: knobs,
stream: stream,
}
}
// Track token deductions of the given priority with the given raft log
// position.
func (dt *Tracker) Track(
ctx context.Context,
pri admissionpb.WorkPriority,
tokens kvflowcontrol.Tokens,
pos kvflowcontrolpb.RaftLogPosition,
) bool {
if !(dt.lowerBound.Less(pos)) {
// We're trying to track a token deduction at a position less than the
// stream's lower-bound. Shout loudly but ultimately no-op. This
// regression indicates buggy usage since:
// - token deductions are done so with monotonically increasing log
// positions (see Handle.DeductTokensFor);
// - the monotonically increasing log positions for token deductions
// also extends to positions at which streams are connected,
// which typically happen when (a) members are added to the raft
// group, (b) previously crashed follower nodes restart, (c)
// replicas are unpaused, or (d) caught back up via snapshots (see
// Handle.ConnectStream).
// - token returns upto some log position don't precede deductions at
// lower log positions (see Handle.ReturnTokensUpto);
logFn := log.Errorf
if buildutil.CrdbTestBuild {
logFn = log.Fatalf
}
logFn(ctx, "observed raft log position less than per-stream lower bound (%s <= %s)",
pos, dt.lowerBound)
return false
}
dt.lowerBound = pos
if len(dt.trackedM[pri]) >= 1 {
last := dt.trackedM[pri][len(dt.trackedM[pri])-1]
if !last.position.Less(pos) {
logFn := log.Errorf
if buildutil.CrdbTestBuild {
logFn = log.Fatalf
}
logFn(ctx, "expected in order tracked log positions (%s < %s)",
last.position, pos)
return false
}
}
dt.trackedM[pri] = append(dt.trackedM[pri], tracked{
tokens: tokens,
position: pos,
})
if log.ExpensiveLogEnabled(ctx, 1) {
log.Infof(ctx, "tracking %s flow control tokens for pri=%s stream=%s pos=%s",
tokens, pri, dt.stream, pos)
}
return true
}
// Untrack all token deductions of the given priority that have log positions
// less than or equal to the one provided.
func (dt *Tracker) Untrack(
ctx context.Context, pri admissionpb.WorkPriority, upto kvflowcontrolpb.RaftLogPosition,
) kvflowcontrol.Tokens {
if dt == nil {
return 0
}
if _, ok := dt.trackedM[pri]; !ok {
return 0
}
var untracked int
var tokens kvflowcontrol.Tokens
for {
if untracked == len(dt.trackedM[pri]) {
break
}
deduction := dt.trackedM[pri][untracked]
if !deduction.position.LessEq(upto) {
break
}
if fn := dt.knobs.UntrackTokensInterceptor; fn != nil {
fn(deduction.tokens, deduction.position)
}
untracked += 1
tokens += deduction.tokens
}
trackedBefore := len(dt.trackedM[pri])
dt.trackedM[pri] = dt.trackedM[pri][untracked:]
if log.ExpensiveLogEnabled(ctx, 1) {
remaining := ""
if len(dt.trackedM[pri]) > 0 {
remaining = fmt.Sprintf(" (%s, ...)", dt.trackedM[pri][0].tokens)
}
log.Infof(ctx, "released %s flow control tokens for %d out of %d tracked deductions for pri=%s stream=%s, up to %s; %d tracked deduction(s) remain%s",
tokens, untracked, trackedBefore, pri, dt.stream, upto, len(dt.trackedM[pri]), remaining)
}
if len(dt.trackedM[pri]) == 0 {
delete(dt.trackedM, pri)
}
if dt.lowerBound.Less(upto) {
dt.lowerBound = upto
}
return tokens
}
// Iter iterates through all tracked token deductions, invoking the provided
// callback with the sum of all tokens at a per-priority level.
func (dt *Tracker) Iter(_ context.Context, f func(admissionpb.WorkPriority, kvflowcontrol.Tokens)) {
for pri, deductions := range dt.trackedM {
var tokens kvflowcontrol.Tokens
for _, deduction := range deductions {
tokens += deduction.tokens
}
f(pri, tokens)
}
}
// LowerBound returns the log position below which we ignore token deductions.
func (dt *Tracker) LowerBound() kvflowcontrolpb.RaftLogPosition {
return dt.lowerBound
}
// Inspect returns a snapshot of all tracked token deductions. It's used to
// power /inspectz-style debugging pages.
func (dt *Tracker) Inspect(ctx context.Context) []kvflowinspectpb.TrackedDeduction {
var deductions []kvflowinspectpb.TrackedDeduction
dt.TestingIter(func(pri admissionpb.WorkPriority, tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) bool {
deductions = append(deductions, kvflowinspectpb.TrackedDeduction{
Priority: int32(pri),
Tokens: int64(tokens),
RaftLogPosition: pos,
})
return true
})
sort.Slice(deductions, func(i, j int) bool { // for determinism
if deductions[i].Priority != deductions[j].Priority {
return deductions[i].Priority < deductions[j].Priority
}
if deductions[i].RaftLogPosition != deductions[j].RaftLogPosition {
return deductions[i].RaftLogPosition.Less(deductions[j].RaftLogPosition)
}
return deductions[i].Tokens < deductions[j].Tokens
})
return deductions
}
// TestingIter is a testing-only re-implementation of Iter. It iterates through
// all tracked token deductions, invoking the provided callback with tracked
// pri<->token<->position triples.
func (dt *Tracker) TestingIter(
f func(admissionpb.WorkPriority, kvflowcontrol.Tokens, kvflowcontrolpb.RaftLogPosition) bool,
) {
for pri, deductions := range dt.trackedM {
for _, deduction := range deductions {
if !f(pri, deduction.tokens, deduction.position) {
return
}
}
}
}
// TestingPrintIter iterates through all tracked tokens and returns a printable
// string, for use in tests.
func (dt *Tracker) TestingPrintIter() string {
type tracked struct {
tokens kvflowcontrol.Tokens
raftLogPosition kvflowcontrolpb.RaftLogPosition
}
const numPriorities = int(admissionpb.HighPri) - int(admissionpb.LowPri)
deductions := [numPriorities][]tracked{}
dt.TestingIter(
func(pri admissionpb.WorkPriority, tokens kvflowcontrol.Tokens, pos kvflowcontrolpb.RaftLogPosition) bool {
i := int(pri) - int(admissionpb.LowPri)
deductions[i] = append(deductions[i], tracked{
tokens: tokens,
raftLogPosition: pos,
})
return true
},
)
var buf strings.Builder
for i, ds := range deductions {
pri := i + int(admissionpb.LowPri)
if len(ds) == 0 {
continue
}
buf.WriteString(fmt.Sprintf("pri=%s\n", admissionpb.WorkPriority(pri)))
for _, deduction := range ds {
buf.WriteString(fmt.Sprintf(" tokens=%s %s\n",
testingPrintTrimmedTokens(deduction.tokens), deduction.raftLogPosition))
}
}
return buf.String()
}
func testingPrintTrimmedTokens(t kvflowcontrol.Tokens) string {
return strings.TrimPrefix(strings.ReplaceAll(t.String(), " ", ""), "+")
}