Skip to content

Commit

Permalink
kvserver: fix delaying of splits with uninitialized followers
Browse files Browse the repository at this point in the history
Bursts of splits (i.e. a sequence of splits for which each split splits
the right-hand side of a prior split) can cause issues. This is because
splitting a range in which a replica needs a snapshot results in two
ranges in which a replica needs a snapshot where additionally there
needs to be a sequencing between the two snapshots (one cannot apply
a snapshot for the post-split replica until the pre-split replica has
moved out of the way). The result of a long burst of splits such as
occurring in RESTORE and IMPORT operations is then an overload of the
snapshot queue with lots of wasted work, unavailable followers with
operations hanging on them, and general mayhem in the logs. Since
bulk operations also write a lot of data to the raft logs, log
truncations then create an additional snapshot burden; in short,
everything will be unhappy for a few hours and the cluster may
effectively be unavailable.

This isn't news to us and in fact was a big problem "back in 2018".
When we first started to understand the issue, we introduced a mechanism
that would delay splits (cockroachdb#32594) with the desired effect of ensuring
that, all followers had caught up to ~all of the previous splits.
This helped, but didn't explain why we were seeing snapshots in the
first place.

Investigating more, we realized that snapshots were sometimes spuriously
requested by an uninitialized replica on the right-hand side which was
contacted by another member of the right-hand side that had already been
initialized by the split executing on the left-hand side; this snapshot
was almost always unnecessary since the local left-hand side would
usually initialize the right-hand side moments later.  To address this,
in cockroachdb#32594 we started unconditionally dropping the first ~seconds worth
of requests to an uninitialized range, and the mechanism was improved in
 cockroachdb#32782 and will now only do this if a local neighboring replica is
expected to perform the split soon.

With all this in place, you would've expected us to have all bases
covered but it turns out that we are still running into issues prior
to this PR.

Concretely, whenever the aforementioned mechanism drops a message from
the leader (a MsgApp), the leader will only contact the replica every
second until it responds. It responds when it has been initialized via
its left neighbor's splits and the leader reaches out again, i.e.  an
average of ~500ms after being initialized. However, by that time, it is
itself already at the bottom of a long chain of splits, and the 500ms
delay is delaying how long it takes for the rest of the chain to get
initialized.  Since the delay compounds on each link of the chain, the
depth of the chain effectively determines the total delay experienced at
the end. This would eventually exceed the patience of the mechanism that
would suppress the snapshots, and snapshots would be requested. We would
descend into madness similar to that experienced in the absence of the
mechanism in the first place.

The mechanism in cockroachdb#32594 could have helped here, but unfortunately it
did not, as it routinely missed the fact that followers were not
initialized yet. This is because during a split burst, the replica
orchestrating the split was typically only created an instant before,
and its raft group hadn't properly transitioned to leader status yet.
This meant that in effect it wasn't delaying the splits at all.

This commit adjusts the logic to delay splits to avoid this problem.
While clamoring for leadership, the delay is upheld. Once collapsed
into a definite state, the existing logic pretty much did the right
thing, as it waited for the right-hand side to be in initialized.

Release note (bug fix): Fixed a scenario in which a rapid sequence
of splits could trigger a storm of Raft snapshots. This would be
accompanied by log messages of the form "would have dropped incoming
MsgApp, but allowing due to ..." and tended to occur as part of
RESTORE/IMPORT operations.
  • Loading branch information
tbg committed Apr 24, 2021
1 parent 9e821aa commit f21720a
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 167 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ go_test(
"client_replica_backpressure_test.go",
"client_replica_gc_test.go",
"client_replica_test.go",
"client_split_burst_test.go",
"client_split_test.go",
"client_status_test.go",
"client_tenant_test.go",
Expand Down
202 changes: 202 additions & 0 deletions pkg/kv/kvserver/client_split_burst_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"math"
"math/rand"
"sync/atomic"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

type splitBurstTest struct {
*testcluster.TestCluster
baseKey roachpb.Key
magicStickyBit hlc.Timestamp
numSplitsSeenOnSlowFollower *int32 // atomic
initialRaftSnaps int
}

func (sbt *splitBurstTest) SplitWithDelay(t *testing.T, location byte) {
t.Helper()
require.NoError(t, sbt.SplitWithDelayE(location))
}

func (sbt *splitBurstTest) SplitWithDelayE(location byte) error {
k := append([]byte(nil), sbt.baseKey...)
splitKey := append(k, location)
_, _, err := sbt.SplitRangeWithExpiration(splitKey, sbt.magicStickyBit)
return err
}

func (sbt *splitBurstTest) NumRaftSnaps(t *testing.T) int {
var totalSnaps int
for i := range sbt.Servers {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := sbt.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name = 'range.snapshots.applied-voter'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
require.EqualValues(t, 1, n)
totalSnaps += c
}
return totalSnaps - sbt.initialRaftSnaps
}

func setupSplitBurstTest(t *testing.T, delay time.Duration) *splitBurstTest {
var magicStickyBit = hlc.Timestamp{WallTime: math.MaxInt64 - 123, Logical: 987654321}

numSplitsSeenOnSlowFollower := new(int32) // atomic
var quiesceCh <-chan struct{}
knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingApplyFilter: func(args kvserverbase.ApplyFilterArgs) (int, *roachpb.Error) {
if args.Split == nil || delay == 0 {
return 0, nil
}
if args.Split.RightDesc.GetStickyBit() != magicStickyBit {
return 0, nil
}
select {
case <-time.After(delay):
case <-quiesceCh:
}
atomic.AddInt32(numSplitsSeenOnSlowFollower, 1)
return 0, nil
},
}}

ctx := context.Background()

// n1 and n3 are fast, n2 is slow (to apply the splits). We need
// three nodes here; delaying the apply loop on n2 also delays
// how quickly commands can reach quorum and would backpressure
// the splits by accident.
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {Knobs: knobs},
},
ReplicationMode: base.ReplicationManual,
})
defer t.Cleanup(func() {
tc.Stopper().Stop(ctx)
})
quiesceCh = tc.Stopper().ShouldQuiesce()

k := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, k, tc.Target(1), tc.Target(2))

sbc := &splitBurstTest{
TestCluster: tc,
baseKey: k,
magicStickyBit: magicStickyBit,
numSplitsSeenOnSlowFollower: numSplitsSeenOnSlowFollower,
}
sbc.initialRaftSnaps = sbc.NumRaftSnaps(t)
return sbc
}

func TestSplitBurstWithSlowFollower(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
t.Run("forward", func(t *testing.T) {
// When splitting at an increasing sequence of keys, in each step we split
// the most recently split range, and we expect the splits to wait for that
// range to have caught up its follower across the preceding split, which
// was delayed as well. So when we're done splitting we should have seen at
// least (numSplits-1) splits get applied on the slow follower.
// This end-to-end exercises `splitDelayHelper`.
//
// This test is fairly slow because every split will incur a 1s penalty
// (dictated by the raft leader's probing interval). We could fix this
// delay here and in production if we had a way to send a signal from
// the slow follower to the leader when the split trigger initializes
// the right-hand side. This is actually an interesting point, because
// the split trigger *replaces* a snapshot - but doesn't fully act like
// one: when a raft group applies a snapshot, it generates an MsgAppResp
// to the leader which will let the leader probe proactively. We could
// signal the split trigger to the raft group as a snapshot being applied
// (as opposed to recreating the in-memory instance as we do now), and
// then this MsgAppResp should be created automatically.

sbt := setupSplitBurstTest(t, 50*time.Millisecond)
defer sbt.Stopper().Stop(ctx)

const numSplits = byte(5)

for i := byte(0); i < numSplits; i++ {
sbt.SplitWithDelay(t, i)
}
// We should have applied all but perhaps the last split on the slow node.
// If we didn't, that indicates a failure to delay the splits accordingly.
require.GreaterOrEqual(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower), int32(numSplits-1))
require.Zero(t, sbt.NumRaftSnaps(t))
})
t.Run("backward", func(t *testing.T) {
// When splitting at a decreasing sequence of keys, we're repeatedly splitting
// the first range. All of its followers are initialized to begin with, and
// even though there is a slow follower, `splitDelayHelper` isn't interested in
// delaying this here (which would imply that it's trying to check that every-
// one is "caught up").
// We set a 100s timeout so that below we can assert that `splitDelayHelper`
// isn't somehow forcing us to wait here.
infiniteDelay := 100 * time.Second
sbt := setupSplitBurstTest(t, infiniteDelay)
defer sbt.Stopper().Stop(ctx)

const numSplits = byte(50)

for i := byte(0); i < numSplits; i++ {
tBegin := timeutil.Now()
sbt.SplitWithDelay(t, numSplits-i)
if dur := timeutil.Since(tBegin); dur > infiniteDelay {
t.Fatalf("waited %s for split #%d", dur, i+1)
}
}
require.Zero(t, atomic.LoadInt32(sbt.numSplitsSeenOnSlowFollower))
require.Zero(t, sbt.NumRaftSnaps(t))
})
t.Run("random", func(t *testing.T) {
// When splitting randomly, we'll see a mixture of forward and backward
// splits, so we can't assert on how many split triggers we observe.
// However, there still shouldn't be any snapshots.
sbt := setupSplitBurstTest(t, 10*time.Millisecond)
defer sbt.Stopper().Stop(ctx)

const numSplits = 20
perm := rand.Perm(numSplits)

doSplit := func(ctx context.Context, i int) error {
return sbt.SplitWithDelayE(byte(perm[i]))
}
require.NoError(t, ctxgroup.GroupWorkers(ctx, numSplits, doSplit))

require.Zero(t, sbt.NumRaftSnaps(t))
})
}
64 changes: 0 additions & 64 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/ts"
"github.com/cockroachdb/cockroach/pkg/ts/tspb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -518,69 +517,6 @@ func TestStoreRangeSplitAtRangeBounds(t *testing.T) {
}
}

// TestSplitTriggerRaftSnapshotRace verifies that when an uninitialized Replica
// resulting from a split hasn't been initialized via the split trigger yet, a
// grace period prevents the replica from requesting an errant Raft snapshot.
// This is verified by running a number of splits and asserting that no Raft
// snapshots are observed. As a nice side effect, this also verifies that log
// truncations don't cause any Raft snapshots in this test.
func TestSplitTriggerRaftSnapshotRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
const numNodes = 3
var args base.TestClusterArgs
// NB: the merge queue is enabled for additional "chaos". Note that the test
// uses three nodes and so there is no replica movement, which would other-
// wise tickle Raft snapshots for unrelated reasons.
tc := testcluster.StartTestCluster(t, numNodes, args)
defer tc.Stopper().Stop(ctx)

numSplits := 100
if util.RaceEnabled {
// Running 100 splits is overkill in race builds.
numSplits = 10
}
perm := rand.Perm(numSplits)
idx := int32(-1) // accessed atomically

numRaftSnaps := func(when string) int {
var totalSnaps int
for i := 0; i < numNodes; i++ {
var n int // num rows (sanity check against test rotting)
var c int // num Raft snapshots
if err := tc.ServerConn(i).QueryRow(`
SELECT count(*), sum(value) FROM crdb_internal.node_metrics WHERE
name = 'range.snapshots.applied-voter'
`).Scan(&n, &c); err != nil {
t.Fatal(err)
}
if expRows := 1; n != expRows {
t.Fatalf("%s: expected %d rows, got %d", when, expRows, n)
}
totalSnaps += c
}
return totalSnaps
}

// There are usually no raft snaps before, but there is a race condition where
// they can occasionally happen during upreplication.
numSnapsBefore := numRaftSnaps("before")

doSplit := func(ctx context.Context, _ int) error {
_, _, err := tc.SplitRange([]byte(fmt.Sprintf("key-%d", perm[atomic.AddInt32(&idx, 1)])))
return err
}

if err := ctxgroup.GroupWorkers(ctx, numSplits, doSplit); err != nil {
t.Fatal(err)
}

// Check that no snaps happened during the splits.
require.Equal(t, numSnapsBefore, numRaftSnaps("after"))
}

// TestStoreRangeSplitIdempotency executes a split of a range and
// verifies that the resulting ranges respond to the right key ranges
// and that their stats have been properly accounted for and requests
Expand Down
Loading

0 comments on commit f21720a

Please sign in to comment.