forked from cockroachdb/cockroach
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbase.go
241 lines (215 loc) · 8.17 KB
/
base.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
// Copyright 2016 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserverbase
import (
"bytes"
"context"
"fmt"
"time"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)
// MergeQueueEnabled is a setting that controls whether the merge queue is
// enabled.
var MergeQueueEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kv.range_merge.queue_enabled",
"whether the automatic merge queue is enabled",
true,
)
// CmdIDKey is a Raft command id. This will be logged unredacted - keep it random.
type CmdIDKey string
// SafeFormat implements redact.SafeFormatter.
func (s CmdIDKey) SafeFormat(sp redact.SafePrinter, verb rune) {
sp.Printf("%x", redact.SafeString(s))
}
func (s CmdIDKey) String() string {
return redact.StringWithoutMarkers(s)
}
var _ redact.SafeFormatter = CmdIDKey("")
// FilterArgs groups the arguments to a ReplicaCommandFilter.
type FilterArgs struct {
Ctx context.Context
CmdID CmdIDKey
Index int
Sid roachpb.StoreID
Req roachpb.Request
Hdr roachpb.Header
Version roachpb.Version
Err error // only used for TestingPostEvalFilter
}
// ProposalFilterArgs groups the arguments to ReplicaProposalFilter.
type ProposalFilterArgs struct {
Ctx context.Context
Cmd *kvserverpb.RaftCommand
QuotaAlloc *quotapool.IntAlloc
CmdID CmdIDKey
Req roachpb.BatchRequest
}
// ApplyFilterArgs groups the arguments to a ReplicaApplyFilter.
type ApplyFilterArgs struct {
kvserverpb.ReplicatedEvalResult
CmdID CmdIDKey
RangeID roachpb.RangeID
StoreID roachpb.StoreID
Req *roachpb.BatchRequest // only set on the leaseholder
ForcedError *roachpb.Error
}
// InRaftCmd returns true if the filter is running in the context of a Raft
// command (it could be running outside of one, for example for a read).
func (f *FilterArgs) InRaftCmd() bool {
return f.CmdID != ""
}
// ReplicaRequestFilter can be used in testing to influence the error returned
// from a request before it is evaluated. Return nil to continue with regular
// processing or non-nil to terminate processing with the returned error.
type ReplicaRequestFilter func(context.Context, *roachpb.BatchRequest) *roachpb.Error
// ReplicaConcurrencyRetryFilter can be used to examine a concurrency retry
// error before it is handled and its batch is re-evaluated.
type ReplicaConcurrencyRetryFilter func(context.Context, *roachpb.BatchRequest, *roachpb.Error)
// ReplicaCommandFilter may be used in tests through the StoreTestingKnobs to
// intercept the handling of commands and artificially generate errors. Return
// nil to continue with regular processing or non-nil to terminate processing
// with the returned error.
type ReplicaCommandFilter func(args FilterArgs) *roachpb.Error
// ReplicaProposalFilter can be used in testing to influence the error returned
// from proposals after a request is evaluated but before it is proposed.
type ReplicaProposalFilter func(args ProposalFilterArgs) *roachpb.Error
// A ReplicaApplyFilter is a testing hook into raft command application.
// See StoreTestingKnobs.
type ReplicaApplyFilter func(args ApplyFilterArgs) (int, *roachpb.Error)
// ReplicaResponseFilter is used in unittests to modify the outbound
// response returned to a waiting client after a replica command has
// been processed. This filter is invoked only by the command proposer.
type ReplicaResponseFilter func(context.Context, *roachpb.BatchRequest, *roachpb.BatchResponse) *roachpb.Error
// ReplicaRangefeedFilter is used in unit tests to modify the request, inject
// responses, or return errors from rangefeeds.
type ReplicaRangefeedFilter func(
args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink,
) *roachpb.Error
// ContainsKey returns whether this range contains the specified key.
func ContainsKey(desc *roachpb.RangeDescriptor, key roachpb.Key) bool {
if bytes.HasPrefix(key, keys.LocalRangeIDPrefix) {
return bytes.HasPrefix(key, keys.MakeRangeIDPrefix(desc.RangeID))
}
keyAddr, err := keys.Addr(key)
if err != nil {
return false
}
return desc.ContainsKey(keyAddr)
}
// ContainsKeyRange returns whether this range contains the specified key range
// from start to end.
func ContainsKeyRange(desc *roachpb.RangeDescriptor, start, end roachpb.Key) bool {
startKeyAddr, err := keys.Addr(start)
if err != nil {
return false
}
endKeyAddr, err := keys.Addr(end)
if err != nil {
return false
}
return desc.ContainsKeyRange(startKeyAddr, endKeyAddr)
}
// IntersectSpan takes an span and a descriptor. It then splits the span
// into up to three pieces: A first piece which is contained in the Range,
// and a slice of up to two further spans which are outside of the key
// range. An span for which [Key, EndKey) is empty does not result in any
// spans; thus intersectIntent only applies to span ranges.
//
// A range-local span range is never split: It's returned as either
// belonging to or outside of the descriptor's key range, and passing an
// span which begins range-local but ends non-local results in a panic.
//
// TODO(tschottdorf): move to proto, make more gen-purpose - kv.truncate does
// some similar things.
func IntersectSpan(
span roachpb.Span, desc *roachpb.RangeDescriptor,
) (middle *roachpb.Span, outside []roachpb.Span) {
start, end := desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey()
if len(span.EndKey) == 0 {
outside = append(outside, span)
return
}
if bytes.Compare(span.Key, keys.LocalRangeMax) < 0 {
if bytes.Compare(span.EndKey, keys.LocalRangeMax) >= 0 {
panic(fmt.Sprintf("a local intent range may not have a non-local portion: %s", span))
}
if ContainsKeyRange(desc, span.Key, span.EndKey) {
return &span, nil
}
return nil, append(outside, span)
}
// From now on, we're dealing with plain old key ranges - no more local
// addressing.
if bytes.Compare(span.Key, start) < 0 {
// Span spans a part to the left of [start, end).
iCopy := span
if bytes.Compare(start, span.EndKey) < 0 {
iCopy.EndKey = start
}
span.Key = iCopy.EndKey
outside = append(outside, iCopy)
}
if bytes.Compare(span.Key, span.EndKey) < 0 && bytes.Compare(end, span.EndKey) < 0 {
// Span spans a part to the right of [start, end).
iCopy := span
if bytes.Compare(iCopy.Key, end) < 0 {
iCopy.Key = end
}
span.EndKey = iCopy.Key
outside = append(outside, iCopy)
}
if bytes.Compare(span.Key, span.EndKey) < 0 && bytes.Compare(span.Key, start) >= 0 && bytes.Compare(end, span.EndKey) >= 0 {
middle = &span
}
return
}
// SplitByLoadMergeDelay wraps "kv.range_split.by_load_merge_delay".
var SplitByLoadMergeDelay = settings.RegisterDurationSetting(
settings.TenantWritable,
"kv.range_split.by_load_merge_delay",
"the delay that range splits created due to load will wait before considering being merged away",
5*time.Minute,
func(v time.Duration) error {
const minDelay = 5 * time.Second
if v < minDelay {
return errors.Errorf("cannot be set to a value below %s", minDelay)
}
return nil
},
)
const (
// MaxCommandSizeDefault is the default for the kv.raft.command.max_size
// cluster setting.
MaxCommandSizeDefault = 64 << 20 // 64 MB
// MaxCommandSizeFloor is the minimum allowed value for the
// kv.raft.command.max_size cluster setting.
MaxCommandSizeFloor = 4 << 20 // 4 MB
)
// MaxCommandSize wraps "kv.raft.command.max_size".
var MaxCommandSize = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"kv.raft.command.max_size",
"maximum size of a raft command",
MaxCommandSizeDefault,
func(size int64) error {
if size < MaxCommandSizeFloor {
return fmt.Errorf("max_size must be greater than %s", humanizeutil.IBytes(MaxCommandSizeFloor))
}
return nil
},
)