-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
replica_backpressure.go
180 lines (163 loc) · 6.45 KB
/
replica_backpressure.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver
import (
"context"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
)
var backpressureLogLimiter = log.Every(500 * time.Millisecond)
// backpressureRangeSizeMultiplier is the multiple of range_max_bytes that a
// range's size must grow to before backpressure will be applied on writes. Set
// to 0 to disable backpressure altogether.
var backpressureRangeSizeMultiplier = settings.RegisterValidatedFloatSetting(
"kv.range.backpressure_range_size_multiplier",
"multiple of range_max_bytes that a range is allowed to grow to without "+
"splitting before writes to that range are blocked, or 0 to disable",
2.0,
func(v float64) error {
if v != 0 && v < 1 {
return errors.Errorf("backpressure multiplier cannot be smaller than 1: %f", v)
}
return nil
},
)
// backpressureByteTolerance exists to deal with the fact that lowering the
// range size by anything larger than the backpressureRangeSizeMultiplier would
// immediately mean that all ranges require backpressure. To mitigate this
// unwanted backpressure we say that any range which is large than the
// size where backpressure would kick in by more than this quantity will
// immediately avoid backpressure. This approach is a bit risky because a
// command larger than this value would effectively disable backpressure
// altogether. Another downside of this approach is that if the range size
// is reduced by roughly exactly the multiplier then we'd potentially have
// lots of ranges in this state.
//
// TODO(ajwerner): We could mitigate this situation further in two ways:
//
// 1) We store in-memory on each replica the largest zone configuration range
// size (largestPreviousMaxRangeBytes) we've seen and we do not backpressure
// if the current range size is less than that. That value is cleared when
// a range splits or runs GC such that the range size becomes smaller than
// the current max range size.
//
// 2) We assign a higher priority in the snapshot queue to ranges which are
// currently backpressuring than ranges which are larger but are not
// applying backpressure.
//
var backpressureByteTolerance = settings.RegisterByteSizeSetting(
"kv.range.backpressure_byte_tolerance",
"defines the number of bytes above the product of "+
"backpressure_range_size_multiplier and the range_max_size at which "+
"backpressure will not apply",
2<<20 /* 2 MiB */)
// backpressurableSpans contains spans of keys where write backpressuring
// is permitted. Writes to any keys within these spans may cause a batch
// to be backpressured.
var backpressurableSpans = []roachpb.Span{
{Key: keys.TimeseriesPrefix, EndKey: keys.TimeseriesKeyMax},
// Backpressure from the end of the system config forward instead of
// over all table data to avoid backpressuring unsplittable ranges.
{Key: keys.SystemConfigTableDataMax, EndKey: keys.TableDataMax},
}
// canBackpressureBatch returns whether the provided BatchRequest is eligible
// for backpressure.
func canBackpressureBatch(ba *roachpb.BatchRequest) bool {
// Don't backpressure splits themselves.
if ba.Txn != nil && ba.Txn.Name == splitTxnName {
return false
}
// Only backpressure batches containing a "backpressurable"
// method that is within a "backpressurable" key span.
for _, ru := range ba.Requests {
req := ru.GetInner()
if !roachpb.CanBackpressure(req) {
continue
}
for _, s := range backpressurableSpans {
if s.Contains(req.Header().Span()) {
return true
}
}
}
return false
}
// shouldBackpressureWrites returns whether writes to the range should be
// subject to backpressure. This is based on the size of the range in
// relation to the split size. The method returns true if the range is more
// than backpressureRangeSizeMultiplier times larger than the split size but not
// larger than that by more than backpressureByteTolerance (see that comment for
// further explanation).
func (r *Replica) shouldBackpressureWrites() bool {
mult := backpressureRangeSizeMultiplier.Get(&r.store.cfg.Settings.SV)
if mult == 0 {
// Disabled.
return false
}
r.mu.RLock()
defer r.mu.RUnlock()
exceeded, bytesOver := r.exceedsMultipleOfSplitSizeRLocked(mult)
if !exceeded {
return false
}
if bytesOver > backpressureByteTolerance.Get(&r.store.cfg.Settings.SV) {
return false
}
return true
}
// maybeBackpressureBatch blocks to apply backpressure if the replica deems
// that backpressure is necessary.
func (r *Replica) maybeBackpressureBatch(ctx context.Context, ba *roachpb.BatchRequest) error {
if !canBackpressureBatch(ba) {
return nil
}
// If we need to apply backpressure, wait for an ongoing split to finish
// if one exists. This does not place a hard upper bound on the size of
// a range because we don't track all in-flight requests (like we do for
// the quota pool), but it does create an effective soft upper bound.
for first := true; r.shouldBackpressureWrites(); first = false {
if first {
r.store.metrics.BackpressuredOnSplitRequests.Inc(1)
defer r.store.metrics.BackpressuredOnSplitRequests.Dec(1)
if backpressureLogLimiter.ShouldLog() {
log.Warningf(ctx, "applying backpressure to limit range growth on batch %s", ba)
}
}
// Register a callback on an ongoing split for this range in the splitQueue.
splitC := make(chan error, 1)
if !r.store.splitQueue.MaybeAddCallback(r.RangeID, func(err error) {
splitC <- err
}) {
// No split ongoing. We may have raced with its completion. There's
// no good way to prevent this race, so we conservatively allow the
// request to proceed instead of throwing an error that would surface
// to the client.
return nil
}
// Wait for the callback to be called.
select {
case <-ctx.Done():
return errors.Wrapf(
ctx.Err(), "aborted while applying backpressure to %s on range %s", ba, r.Desc(),
)
case err := <-splitC:
if err != nil {
return errors.Wrapf(
err, "split failed while applying backpressure to %s on range %s", ba, r.Desc(),
)
}
}
}
return nil
}