Skip to content

Commit

Permalink
Merge pull request cockroachdb#23794 from nvanbenschoten/nvanbenschot…
Browse files Browse the repository at this point in the history
…en/23762

cherrypick-2.0: storage: check leaseholder status in AdminSplit retry loop
  • Loading branch information
nvanbenschoten authored Mar 13, 2018
2 parents abadf9f + 49301be commit 41f9808
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 4 deletions.
177 changes: 177 additions & 0 deletions pkg/storage/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,183 @@ func TestStoreSplitTimestampCacheDifferentLeaseHolder(t *testing.T) {
}
}

// TestStoreSplitOnRemovedReplica prevents regression of #23673. In that issue,
// it was observed that the retry loop in AdminSplit could go into an infinite
// loop if the replica it was being run on had been removed from the range. The
// loop now checks that the replica performing the split is the leaseholder
// before each iteration.
func TestStoreSplitOnRemovedReplica(t *testing.T) {
defer leaktest.AfterTest(t)()

leftKey := roachpb.Key("a")
splitKey := roachpb.Key("b")
rightKey := roachpb.Key("c")

var newDesc roachpb.RangeDescriptor
inFilter := make(chan struct{}, 1)
beginBlockingSplit := make(chan struct{})
finishBlockingSplit := make(chan struct{})
filter := func(ba roachpb.BatchRequest) *roachpb.Error {
// Block replica 1's attempt to perform the AdminSplit. We detect the
// split's range descriptor update and block until the rest of the test
// is ready. We then return a ConditionFailedError, simulating a
// descriptor update race.
if ba.Replica.NodeID == 1 {
for _, req := range ba.Requests {
if cput, ok := req.GetInner().(*roachpb.ConditionalPutRequest); ok {
leftDescKey := keys.RangeDescriptorKey(roachpb.RKey(leftKey))
if cput.Key.Equal(leftDescKey) {
var desc roachpb.RangeDescriptor
if err := cput.Value.GetProto(&desc); err != nil {
panic(err)
}

if desc.EndKey.Equal(splitKey) {
select {
case <-beginBlockingSplit:
select {
case inFilter <- struct{}{}:
// Let the test know we're in the filter.
default:
}
<-finishBlockingSplit

var val roachpb.Value
if err := val.SetProto(&newDesc); err != nil {
panic(err)
}
return roachpb.NewError(&roachpb.ConditionFailedError{
ActualValue: &val,
})
default:
}
}
}
}
}
}
return nil
}

var args base.TestClusterArgs
args.ReplicationMode = base.ReplicationManual
args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{
TestingRequestFilter: filter,
}

tc := testcluster.StartTestCluster(t, 3, args)
defer tc.Stopper().Stop(context.TODO())

// Split the data range, mainly to avoid other splits getting in our way.
for _, k := range []roachpb.Key{leftKey, rightKey} {
if _, _, err := tc.SplitRange(k); err != nil {
t.Fatal(errors.Wrapf(err, "split at %s", k))
}
}

// Send an AdminSplit request to the replica. In the filter above we'll
// block the first cput in this split until we're ready to let it loose
// again, which will be after we remove the replica from the range.
splitRes := make(chan error)
close(beginBlockingSplit)
go func() {
_, _, err := tc.SplitRange(splitKey)
splitRes <- err
}()
<-inFilter

// Move the range from node 0 to node 1. Then add node 2 to the range.
// node 0 will never hear about this range descriptor update.
var err error
if newDesc, err = tc.AddReplicas(leftKey, tc.Target(1)); err != nil {
t.Fatal(err)
}
if err := tc.TransferRangeLease(newDesc, tc.Target(1)); err != nil {
t.Fatal(err)
}
if _, err := tc.RemoveReplicas(leftKey, tc.Target(0)); err != nil {
t.Fatal(err)
}
if newDesc, err = tc.AddReplicas(leftKey, tc.Target(2)); err != nil {
t.Fatal(err)
}

// Stop blocking the split request's cput. This will cause the cput to fail
// with a ConditionFailedError. The error will warrant a retry in
// AdminSplit's retry loop, but when the removed replica notices that it is
// no longer the leaseholder, it will return a NotLeaseholderError. This in
// turn will allow the AdminSplit to be re-routed to the new leaseholder,
// where it will succeed.
close(finishBlockingSplit)
if err = <-splitRes; err != nil {
t.Errorf("AdminSplit returned error: %v", err)
}
}

// TestStoreSplitFailsAfterMaxRetries prevents regression of #23310. It
// ensures that an AdminSplit attempt will retry a limited number of times
// before returning unsuccessfully.
func TestStoreSplitFailsAfterMaxRetries(t *testing.T) {
defer leaktest.AfterTest(t)()

leftKey := roachpb.Key("a")
splitKey := roachpb.Key("b")
rightKey := roachpb.Key("c")

var splitAttempts int64
filter := func(ba roachpb.BatchRequest) *roachpb.Error {
// Intercept and fail replica 1's attempt to perform the AdminSplit.
// We detect the split's range descriptor update and return an
// AmbiguousResultError, which is retried.
for _, req := range ba.Requests {
if cput, ok := req.GetInner().(*roachpb.ConditionalPutRequest); ok {
leftDescKey := keys.RangeDescriptorKey(roachpb.RKey(leftKey))
if cput.Key.Equal(leftDescKey) {
var desc roachpb.RangeDescriptor
if err := cput.Value.GetProto(&desc); err != nil {
panic(err)
}

if desc.EndKey.Equal(splitKey) {
atomic.AddInt64(&splitAttempts, 1)
return roachpb.NewError(&roachpb.AmbiguousResultError{})
}
}
}
}
return nil
}

var args base.TestClusterArgs
args.ReplicationMode = base.ReplicationManual
args.ServerArgs.Knobs.Store = &storage.StoreTestingKnobs{
TestingRequestFilter: filter,
}

tc := testcluster.StartTestCluster(t, 1, args)
defer tc.Stopper().Stop(context.TODO())

// Split the data range, mainly to avoid other splits getting in our way.
for _, k := range []roachpb.Key{leftKey, rightKey} {
if _, _, err := tc.SplitRange(k); err != nil {
t.Fatal(errors.Wrapf(err, "split at %s", k))
}
}

// Send an AdminSplit request to the replica. In the filter above we'll
// continue to return AmbiguousResultErrors. The split retry loop should
// retry a few times before exiting unsuccessfully.
_, _, err := tc.SplitRange(splitKey)
if !testutils.IsError(err, "split at key .* failed: result is ambiguous") {
t.Errorf("unexpected error from SplitRange: %v", err)
}

const expAttempts = 11 // 10 retries
if splitAttempts != expAttempts {
t.Errorf("expected %d split attempts, found %d", expAttempts, splitAttempts)
}
}

func TestStoreSplitGCThreshold(t *testing.T) {
defer leaktest.AfterTest(t)()
storeCfg := storage.TestStoreConfig(nil)
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1878,6 +1878,12 @@ func (r *Replica) Send(
return nil, roachpb.NewError(err)
}

if filter := r.store.cfg.TestingKnobs.TestingRequestFilter; filter != nil {
if pErr := filter(ba); pErr != nil {
return nil, pErr
}
}

// Differentiate between admin, read-only and write.
var pErr *roachpb.Error
if useRaft {
Expand Down
21 changes: 17 additions & 4 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,12 +705,24 @@ func runCommitTrigger(
// AdminSplit divides the range into into two ranges using args.SplitKey.
func (r *Replica) AdminSplit(
ctx context.Context, args roachpb.AdminSplitRequest,
) (roachpb.AdminSplitResponse, *roachpb.Error) {
) (reply roachpb.AdminSplitResponse, pErr *roachpb.Error) {
if len(args.SplitKey) == 0 {
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided")
}
for retryable := retry.StartWithCtx(ctx, base.DefaultRetryOptions()); retryable.Next(); {
reply, _, pErr := r.adminSplitWithDescriptor(ctx, args, r.Desc())

retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 10
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
// Admin commands always require the range lease to begin (see
// executeAdminBatch), but we may have lost it while in this retry loop.
// Without the lease, a replica's local descriptor can be arbitrarily
// stale, which will result in a ConditionFailedError. To avoid this,
// we make sure that we still have the lease before each attempt.
if _, pErr = r.redirectOnOrAcquireLease(ctx); pErr != nil {
return roachpb.AdminSplitResponse{}, pErr
}

reply, _, pErr = r.adminSplitWithDescriptor(ctx, args, r.Desc())
// On seeing a ConditionFailedError or an AmbiguousResultError, retry the
// command with the updated descriptor.
switch pErr.GetDetail().(type) {
Expand All @@ -720,7 +732,8 @@ func (r *Replica) AdminSplit(
return reply, pErr
}
}
return roachpb.AdminSplitResponse{}, roachpb.NewError(ctx.Err())
// If we broke out of the loop after MaxRetries, return the last error.
return roachpb.AdminSplitResponse{}, pErr
}

func maybeDescriptorChangedError(desc *roachpb.RangeDescriptor, err error) (string, bool) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/storagebase/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ func (f *FilterArgs) InRaftCmd() bool {
return f.CmdID != ""
}

// ReplicaRequestFilter can be used in testing to influence the error returned
// from a request before it is evaluated. Notably, the filter is run before the
// request is added to the CommandQueue, so blocking in the filter will not
// block interfering requests.
type ReplicaRequestFilter func(roachpb.BatchRequest) *roachpb.Error

// ReplicaCommandFilter may be used in tests through the StoreTestingKnobs to
// intercept the handling of commands and artificially generate errors. Return
// nil to continue with regular processing or non-nil to terminate processing
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,12 @@ type StoreConfig struct {
type StoreTestingKnobs struct {
EvalKnobs batcheval.TestingKnobs

// TestingRequestFilter is called before evaluating each command on a
// replica. The filter is run before the request is added to the
// CommandQueue, so blocking in the filter will not block interfering
// requests. If it returns an error, the command will not be evaluated.
TestingRequestFilter storagebase.ReplicaRequestFilter

// TestingProposalFilter is called before proposing each command.
TestingProposalFilter storagebase.ReplicaProposalFilter

Expand Down

0 comments on commit 41f9808

Please sign in to comment.