Skip to content

Commit

Permalink
kvserver: deflake TestMergeQueue{...} tests
Browse files Browse the repository at this point in the history
Fixes #73107. Fixes #79109. Fixes #85372.

It's possible that under certain conditions the merge queue doesn't
immediately merge what these tests are expecting. One possible reasons is
span config asynchrony; the merge queue is actually disabled until each
store learns about at least some span config state: #78122 (if we don't
have any span configs available, enabling range merges would be
extremely dangerous -- we could collapse everything into a single
range).

Release justification: stability work
Release note: None
  • Loading branch information
irfansharif committed Aug 19, 2022
1 parent 04c6a1a commit 8f335cb
Showing 1 changed file with 47 additions and 60 deletions.
107 changes: 47 additions & 60 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4143,26 +4143,36 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) {
}
}

func verifyMerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) {
func verifyMergedSoon(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) {
t.Helper()
repl := store.LookupReplica(rhsStartKey)
if repl == nil {
t.Fatal("replica doesn't exist")
}
if !repl.Desc().StartKey.Equal(lhsStartKey) {
t.Fatalf("ranges unexpectedly unmerged expected startKey %s, but got %s", lhsStartKey, repl.Desc().StartKey)
}
testutils.SucceedsSoon(t, func() error {
store.MustForceMergeScanAndProcess()
repl := store.LookupReplica(rhsStartKey)
if repl == nil {
t.Fatal("replica doesn't exist")
}
if !repl.Desc().StartKey.Equal(lhsStartKey) {
return errors.Newf("ranges unexpectedly unmerged expected startKey %s, but got %s", lhsStartKey, repl.Desc().StartKey)
}
return nil
})
}

func verifyUnmerged(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) {
func verifyUnmergedSoon(
t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey,
) {
t.Helper()
repl := store.LookupReplica(rhsStartKey)
if repl == nil {
t.Fatal("replica doesn't exist")
}
if repl.Desc().StartKey.Equal(lhsStartKey) {
t.Fatalf("ranges unexpectedly merged")
}
testutils.SucceedsSoon(t, func() error {
store.MustForceMergeScanAndProcess()
repl := store.LookupReplica(rhsStartKey)
if repl == nil {
t.Fatal("replica doesn't exist")
}
if repl.Desc().StartKey.Equal(lhsStartKey) {
return errors.Newf("ranges unexpectedly merged")
}
return nil
})
}

func TestMergeQueue(t *testing.T) {
Expand Down Expand Up @@ -4268,27 +4278,23 @@ func TestMergeQueue(t *testing.T) {
t.Run("sanity", func(t *testing.T) {
// Check that ranges are not trivially merged after reset.
reset(t)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
reset(t)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("both-empty", func(t *testing.T) {
reset(t)
clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("lhs-undersize", func(t *testing.T) {
reset(t)
conf := conf
conf.RangeMinBytes *= 2
lhs().SetSpanConfig(conf)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-size-threshold", func(t *testing.T) {
Expand All @@ -4300,22 +4306,17 @@ func TestMergeQueue(t *testing.T) {
conf.RangeMinBytes = rhs().GetMVCCStats().Total() + 1
conf.RangeMaxBytes = lhs().GetMVCCStats().Total() + rhs().GetMVCCStats().Total() - 1
setSpanConfigs(t, conf)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)

// Once the maximum size threshold is increased, the merge can occur.
conf.RangeMaxBytes += 1
setSpanConfigs(t, conf)
l := lhs().RangeID
r := rhs().RangeID
log.Infof(ctx, "Left=%s, Right=%s", l, r)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("non-collocated", func(t *testing.T) {
reset(t)
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
tc.AddVotersOrFatal(t, rhs().Desc().StartKey.AsRawKey(), tc.Target(1))
tc.TransferRangeLeaseOrFatal(t, *rhs().Desc(), tc.Target(1))
// NB: We're running on a 2 node `TestCluster` so we can count on the
Expand All @@ -4324,8 +4325,7 @@ func TestMergeQueue(t *testing.T) {
tc.RemoveVotersOrFatal(t, rhs().Desc().StartKey.AsRawKey(), tc.Target(0))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("load-based-merging", func(t *testing.T) {
Expand Down Expand Up @@ -4371,8 +4371,7 @@ func TestMergeQueue(t *testing.T) {
lhs().LoadBasedSplitter().Reset(tc.Servers[0].Clock().PhysicalTime())

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("unreliable-rhs-qps", func(t *testing.T) {
Expand All @@ -4381,8 +4380,7 @@ func TestMergeQueue(t *testing.T) {
rhs().LoadBasedSplitter().Reset(tc.Servers[1].Clock().PhysicalTime())

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-qps-above-threshold", func(t *testing.T) {
Expand All @@ -4393,8 +4391,7 @@ func TestMergeQueue(t *testing.T) {
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(moreThanHalfQPS))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("combined-qps-below-threshold", func(t *testing.T) {
Expand All @@ -4406,21 +4403,18 @@ func TestMergeQueue(t *testing.T) {
lhs().LoadBasedSplitter().RecordMax(tc.Servers[1].Clock().PhysicalTime(), float64(lessThanHalfQPS))

clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})
})

t.Run("sticky-bit", func(t *testing.T) {
reset(t)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)

// Perform manual merge and verify that no merge occurred.
split(t, rhsStartKey.AsRawKey(), hlc.MaxTimestamp /* expirationTime */)
clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)

// Delete sticky bit and verify that merge occurs.
unsplitArgs := &roachpb.AdminUnsplitRequest{
Expand All @@ -4431,15 +4425,13 @@ func TestMergeQueue(t *testing.T) {
if _, err := kv.SendWrapped(ctx, store.DB().NonTransactionalSender(), unsplitArgs); err != nil {
t.Fatal(err)
}
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})

t.Run("sticky-bit-expiration", func(t *testing.T) {
manualSplitTTL := 10 * time.Millisecond
reset(t)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)

// Pause the clock for the duration of this subtest so that
// the sticky bit does not expire prematurely.
Expand All @@ -4450,18 +4442,15 @@ func TestMergeQueue(t *testing.T) {
exp := tc.Servers[0].Clock().Now().Add(manualSplitTTL.Nanoseconds(), 0)
split(t, rhsStartKey.AsRawKey(), exp /* expirationTime */)
clearRange(t, lhsStartKey, rhsEndKey)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)

// Sticky bit is not expired yet.
manualClock.Increment(manualSplitTTL.Nanoseconds() / 2)
store.MustForceMergeScanAndProcess()
verifyUnmerged(t, store, lhsStartKey, rhsStartKey)
verifyUnmergedSoon(t, store, lhsStartKey, rhsStartKey)

// Sticky bit is expired.
manualClock.Increment(manualSplitTTL.Nanoseconds())
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, lhsStartKey, rhsStartKey)
verifyMergedSoon(t, store, lhsStartKey, rhsStartKey)
})
}

Expand Down Expand Up @@ -4663,8 +4652,7 @@ func TestMergeQueueSeesNonVoters(t *testing.T) {
rightDesc = tc.LookupRangeOrFatal(t, rightDesc.StartKey.AsRawKey())

store.SetMergeQueueActive(true)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, leftDesc.StartKey, rightDesc.StartKey)
verifyMergedSoon(t, store, leftDesc.StartKey, rightDesc.StartKey)
})
}
}
Expand Down Expand Up @@ -4749,8 +4737,7 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
return nil
})
store.SetMergeQueueActive(true)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, leftDesc.StartKey, rightDesc.StartKey)
verifyMergedSoon(t, store, leftDesc.StartKey, rightDesc.StartKey)
}

func TestInvalidSubsumeRequest(t *testing.T) {
Expand Down

0 comments on commit 8f335cb

Please sign in to comment.