-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
client_replica_backpressure_test.go
290 lines (257 loc) · 10.1 KB
/
client_replica_backpressure_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package kvserver_test
import (
"context"
"fmt"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/jackc/pgx"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
)
// Test that mitigations to backpressure when reducing the range size works.
func TestBackpressureNotAppliedWhenReducingRangeSize(t *testing.T) {
defer leaktest.AfterTest(t)()
rRand, _ := randutil.NewPseudoRand()
ctx := context.Background()
// Some arbitrary data sizes we'll load into a table and then use to derive
// range size parameters. We want something not too tiny but also not too big
// that it takes a while to load.
const (
rowSize = 16 << 10
dataSize = 512 << 10
numRows = dataSize / rowSize
)
// setup will set up a testcluster with a table filled with data. All splits
// will be blocked until the returned closure is called.
setup := func(t *testing.T, numServers int) (
tc *testcluster.TestCluster,
args base.TestClusterArgs,
tdb *sqlutils.SQLRunner,
tablePrefix roachpb.Key,
unblockSplit func(),
waitForBlockedRange func(id roachpb.RangeID),
) {
// Add a testing knob to block split traGet(repl.RangeID)nsactions which we'll enable before
// we return from setup.
var allowSplits atomic.Value
allowSplits.Store(true)
unblockCh := make(chan struct{}, 1)
var rangesBlocked sync.Map
args = base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, ba roachpb.BatchRequest) *roachpb.Error {
if ba.Header.Txn != nil && ba.Header.Txn.Name == "split" && !allowSplits.Load().(bool) {
rangesBlocked.Store(ba.Header.RangeID, true)
defer rangesBlocked.Delete(ba.Header.RangeID)
select {
case <-unblockCh:
return roachpb.NewError(errors.Errorf("splits disabled"))
case <-ctx.Done():
<-ctx.Done()
}
}
return nil
},
},
},
},
}
tc = testcluster.StartTestCluster(t, numServers, args)
require.NoError(t, tc.WaitForFullReplication())
// Create the table, split it off, and load it up with data.
tdb = sqlutils.MakeSQLRunner(tc.ServerConn(0))
tdb.Exec(t, "CREATE TABLE foo (k INT PRIMARY KEY, v BYTES NOT NULL)")
var tableID int
tdb.QueryRow(t, "SELECT table_id FROM crdb_internal.tables WHERE name = 'foo'").Scan(&tableID)
require.NotEqual(t, 0, tableID)
tablePrefix = keys.MakeTablePrefix(uint32(tableID))
tc.SplitRangeOrFatal(t, tablePrefix)
require.NoError(t, tc.WaitForSplitAndInitialization(tablePrefix))
for i := 0; i < dataSize/rowSize; i++ {
tdb.Exec(t, "UPSERT INTO foo VALUES ($1, $2)",
rRand.Intn(numRows), randutil.RandBytes(rRand, rowSize))
}
// Block splits and return.
allowSplits.Store(false)
var closeOnce sync.Once
unblockSplit = func() {
closeOnce.Do(func() {
allowSplits.Store(true)
close(unblockCh)
})
}
waitForBlockedRange = func(id roachpb.RangeID) {
testutils.SucceedsSoon(t, func() error {
if _, ok := rangesBlocked.Load(id); !ok {
return errors.Errorf("waiting for %v to be blocked", id)
}
return nil
})
}
return tc, args, tdb, tablePrefix, unblockSplit, waitForBlockedRange
}
waitForZoneConfig := func(t *testing.T, tc *testcluster.TestCluster, tablePrefix roachpb.Key, exp int64) {
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
s := tc.Server(i)
_, r := getFirstStoreReplica(t, s, tablePrefix)
_, zone := r.DescAndZone()
if *zone.RangeMaxBytes != exp {
return fmt.Errorf("expected %d, got %d", exp, *zone.RangeMaxBytes)
}
}
return nil
})
}
moveTableToNewStore := func(t *testing.T, tc *testcluster.TestCluster, args base.TestClusterArgs, tablePrefix roachpb.Key) {
tc.AddServer(t, args.ServerArgs)
testutils.SucceedsSoon(t, func() error {
desc, err := tc.LookupRange(tablePrefix)
require.NoError(t, err)
voters := desc.Replicas().Voters()
if len(voters) == 1 && voters[0].NodeID == tc.Server(1).NodeID() {
return nil
}
if len(voters) == 1 {
desc, err = tc.AddReplicas(tablePrefix, tc.Target(1))
if err != nil {
return err
}
}
if err = tc.TransferRangeLease(desc, tc.Target(1)); err != nil {
return err
}
_, err = tc.RemoveReplicas(tablePrefix, tc.Target(0))
return err
})
}
t.Run("no backpressure when much larger on existing node", func(t *testing.T) {
tc, _, tdb, tablePrefix, unblockSplits, _ := setup(t, 1)
defer tc.Stopper().Stop(ctx)
defer unblockSplits()
tdb.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING "+
"range_max_bytes = $1, range_min_bytes = $2", dataSize/5, dataSize/10)
waitForZoneConfig(t, tc, tablePrefix, dataSize/5)
// Don't observe backpressure.
tdb.Exec(t, "UPSERT INTO foo VALUES ($1, $2)",
rRand.Intn(10000000), randutil.RandBytes(rRand, rowSize))
})
t.Run("no backpressure when much larger on new node", func(t *testing.T) {
tc, args, tdb, tablePrefix, unblockSplits, _ := setup(t, 1)
defer tc.Stopper().Stop(ctx)
defer unblockSplits()
// We didn't want to have to load too much data into these ranges because
// it makes the testing slower so let's lower the threshold at which we'll
// consider the range to be way over the backpressure limit from megabytes
// down to kilobytes.
tdb.Exec(t, "SET CLUSTER SETTING kv.range.backpressure_byte_tolerance = '1 KiB'")
tdb.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING "+
"range_max_bytes = $1, range_min_bytes = $2", dataSize/5, dataSize/10)
waitForZoneConfig(t, tc, tablePrefix, dataSize/5)
// Then we'll add a new server and move the table there.
moveTableToNewStore(t, tc, args, tablePrefix)
// Don't observe backpressure.
tdb.Exec(t, "UPSERT INTO foo VALUES ($1, $2)",
rRand.Intn(10000000), randutil.RandBytes(rRand, rowSize))
})
t.Run("no backpressure when near limit on existing node", func(t *testing.T) {
tc, _, tdb, tablePrefix, unblockSplits, _ := setup(t, 1)
defer tc.Stopper().Stop(ctx)
defer unblockSplits()
// We didn't want to have to load too much data into these ranges because
// it makes the testing slower so let's lower the threshold at which we'll
// consider the range to be way over the backpressure limit from megabytes
// down to kilobytes.
tdb.Exec(t, "SET CLUSTER SETTING kv.range.backpressure_byte_tolerance = '1 KiB'")
// Now we'll change the range_max_bytes to be 2x the range size and within the
// slop limit. We won't see backpressure because the range will remember
// its previous zone config setting.
s, repl := getFirstStoreReplica(t, tc.Server(0), tablePrefix.Next())
newMax := repl.GetMVCCStats().Total()/2 - 32<<10
newMin := newMax / 4
tdb.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING "+
"range_max_bytes = $1, range_min_bytes = $2", newMax, newMin)
waitForZoneConfig(t, tc, tablePrefix, newMax)
// Don't observe backpressure because we remember the previous max size on
// this node.
tdb.Exec(t, "UPSERT INTO foo VALUES ($1, $2)",
rRand.Intn(10000000), randutil.RandBytes(rRand, rowSize))
// Allow one split to occur and make sure that the remembered value is
// cleared.
unblockSplits()
testutils.SucceedsSoon(t, func() error {
if size := repl.LargestPreviousMaxRangeSizeBytes(); size != 0 {
_ = s.ForceSplitScanAndProcess()
return errors.Errorf("expected LargestPreviousMaxRangeSizeBytes to be 0, got %d", size)
}
return nil
})
})
t.Run("backpressure when near limit on new node", func(t *testing.T) {
tc, args, tdb, tablePrefix, unblockSplits, waitForBlocked := setup(t, 1)
defer tc.Stopper().Stop(ctx)
defer unblockSplits()
// Now we'll change the range_max_bytes to be 2x the range size.
_, repl := getFirstStoreReplica(t, tc.Server(0), tablePrefix.Next())
newMax := repl.GetMVCCStats().Total()/2 - 32<<10
newMin := newMax / 4
tdb.Exec(t, "ALTER TABLE foo CONFIGURE ZONE USING "+
"range_max_bytes = $1, range_min_bytes = $2", newMax, newMin)
waitForZoneConfig(t, tc, tablePrefix, newMax)
// Then we'll add a new server and move the table there.
moveTableToNewStore(t, tc, args, tablePrefix)
s, repl := getFirstStoreReplica(t, tc.Server(1), tablePrefix)
s.SetReplicateQueueActive(false)
require.Len(t, repl.Desc().Replicas().All(), 1)
// We really need to make sure that the split queue has hit this range,
// otherwise we'll fail to backpressure.
go func() { _ = s.ForceSplitScanAndProcess() }()
waitForBlocked(repl.RangeID)
// Observe backpressure now that the range is just over the limit.
// Use pgx so that cancellation does something reasonable.
url, cleanup := sqlutils.PGUrl(t, tc.Server(1).ServingSQLAddr(), "", url.User("root"))
defer cleanup()
conf, err := pgx.ParseConnectionString(url.String())
require.NoError(t, err)
c, err := pgx.Connect(conf)
require.NoError(t, err)
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()
upsertErrCh := make(chan error)
go func() {
_, err := c.ExecEx(ctxWithCancel, "UPSERT INTO foo VALUES ($1, $2)",
nil /* options */, rRand.Intn(numRows), randutil.RandBytes(rRand, rowSize))
upsertErrCh <- err
}()
select {
case <-time.After(10 * time.Millisecond):
cancel()
case err := <-upsertErrCh:
t.Fatalf("expected no error because the request should hang, got %v", err)
}
require.Equal(t, context.Canceled, <-upsertErrCh)
})
}