Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: mvccExportToWriter should always return buffered range keys #98289

Merged
merged 1 commit into from
Mar 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6432,7 +6432,7 @@ func mvccExportToWriter(
firstIteration := true
// skipTombstones controls whether we include tombstones.
//
// We want tombstones if we are exporting all reivions or if
// We want tombstones if we are exporting all revisions or if
// we have a StartTS. A non-empty StartTS is used by
// incremental backups and thus needs to see tombstones if
// that happens to be the latest value.
Expand All @@ -6441,7 +6441,10 @@ func mvccExportToWriter(
var rows RowCounter
// Only used if trackKeyBoundary is true.
var curKey roachpb.Key

var resumeKey MVCCKey
var resumeIsCPUOverLimit bool

var rangeKeys MVCCRangeKeyStack
var rangeKeysSize int64

Expand Down Expand Up @@ -6513,7 +6516,8 @@ func mvccExportToWriter(
if isNewKey {
resumeKey.Timestamp = hlc.Timestamp{}
}
return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey, CPUOverlimit: true}, nil
resumeIsCPUOverLimit = true
break
}
}

Expand Down Expand Up @@ -6722,7 +6726,7 @@ func mvccExportToWriter(
rows.BulkOpSummary.DataSize += rangeKeysSize
}

return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey}, nil
return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey, CPUOverlimit: resumeIsCPUOverLimit}, nil
}

// MVCCExportOptions contains options for MVCCExportToSST.
Expand Down
177 changes: 137 additions & 40 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -6141,6 +6142,10 @@ func TestWillOverflow(t *testing.T) {
// in which mis-handling of resume spans would cause MVCCExportToSST
// to return an empty resume key in cases where the resource limiters
// caused an early return of a resume span.
//
// NB: That this test treats the result of MVCCExportToSST _without_
// CPU rate limiting as the truth. Bugs that affect all exports will
// not be caught by this test.
func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -6152,26 +6157,74 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
maxKey = int64(1000)
minTimestamp = hlc.Timestamp{WallTime: 100000}
maxTimestamp = hlc.Timestamp{WallTime: 200000}

exportAllQuery = queryLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
latest: false,
}
)

assertExportEqualWithOptions := func(t *testing.T, ctx context.Context, engine Engine, expectedData []MVCCKey, initialOpts MVCCExportOptions) {
dataIndex := 0
// When ExportRequest is interrupted by the CPU limiter, the currently
// buffered range key stack will have its EndKey truncated to the resume
// key. To account for this, we write all of the range keys back into a
// store and then export them out again without interruption.
canonicalizeRangeKeys := func(in []MVCCRangeKeyStack) []MVCCRangeKeyStack {
if len(in) == 0 {
return in
}

engine := createTestPebbleEngine()
defer engine.Close()
for _, keyStack := range in {
for _, version := range keyStack.Versions {
require.NoError(t, engine.PutRawMVCCRangeKey(keyStack.AsRangeKey(version), []byte{}))
}
}
require.NoError(t, engine.Flush())
keys, rKeys := exportAllData(t, engine, exportAllQuery)
require.Equal(t, 0, len(keys))
return rKeys
}

assertExportEqualWithOptions := func(t *testing.T, ctx context.Context, engine Engine,
expectedKeys []MVCCKey,
expectedRangeKeys []MVCCRangeKeyStack,
initialOpts MVCCExportOptions) {

keysIndex := 0
rKeysBuf := []MVCCRangeKeyStack{}

startKey := initialOpts.StartKey
for len(startKey.Key) > 0 {
var sstFile bytes.Buffer
opts := initialOpts
opts.StartKey = startKey
_, resumeInfo, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile)
require.NoError(t, err)
chunk := sstToKeys(t, sstFile.Bytes())
require.LessOrEqual(t, len(chunk), len(expectedData)-dataIndex, "remaining test data")
for _, key := range chunk {
require.True(t, key.Equal(expectedData[dataIndex]), "returned key is not equal")
dataIndex++

keys, rangeKeys := sstToKeys(t, sstFile.Bytes())

require.LessOrEqual(t, len(keys), len(expectedKeys)-keysIndex, "remaining test key data")

for _, key := range keys {
require.True(t, key.Equal(expectedKeys[keysIndex]), "returned key is not equal")
keysIndex++
}
rKeysBuf = append(rKeysBuf, rangeKeys...)
startKey = resumeInfo.ResumeKey
}
require.Equal(t, len(expectedData), dataIndex, "not all expected data was consumed")
require.Equal(t, len(expectedKeys), keysIndex, "not all expected keys were consumed")

actualRangeKeys := canonicalizeRangeKeys(rKeysBuf)
require.Equal(t, len(expectedRangeKeys), len(actualRangeKeys))
for i, actual := range actualRangeKeys {
expected := expectedRangeKeys[i]
require.True(t, actual.Equal(expected), "range key mismatch %v != %v", actual, expected)
}

}
t.Run("elastic CPU limit exhausted",
func(t *testing.T) {
Expand All @@ -6186,13 +6239,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
tombstoneChance: 0.01,
}
generateData(t, engine, limits, (limits.maxKey-limits.minKey)*10)
data := exportAllData(t, engine, queryLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
latest: false,
})
keys, rKeys := exportAllData(t, engine, exportAllQuery)

// Our ElasticCPUWorkHandle will fail on the very first call. As a result,
// the very first return from MVCCExportToSST will actually contain no
Expand All @@ -6205,7 +6252,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
}
return false, 0
}))
assertExportEqualWithOptions(t, ctx, engine, data, MVCCExportOptions{
assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{
StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp},
EndKey: testKey(limits.maxKey),
StartTS: limits.minTimestamp,
Expand All @@ -6227,26 +6274,50 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
tombstoneChance: 0.01,
}
generateData(t, engine, limits, (limits.maxKey-limits.minKey)*10)
data := exportAllData(t, engine, queryLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
latest: false,
keys, rKeys := exportAllData(t, engine, exportAllQuery)

// Our ElasticCPUWorkHandle will always fail. But, we
// should still make progress, one key at a time.
ctx := admission.ContextWithElasticCPUWorkHandle(context.Background(), admission.TestingNewElasticCPUHandleWithCallback(func() (bool, time.Duration) {
return true, 0
}))
assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{
StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp},
EndKey: testKey(limits.maxKey),
StartTS: limits.minTimestamp,
EndTS: limits.maxTimestamp,
ExportAllRevisions: true,
})
})
t.Run("elastic CPU limit always exhausted with range keys",
func(t *testing.T) {
engine := createTestPebbleEngine()
defer engine.Close()
limits := dataLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
tombstoneChance: 0.50,
useRangeTombstones: true,
}
// Adding many range keys makes this test much slower,
// so we use 2*keyRange rather than 10*keyRange here.
generateData(t, engine, limits, (limits.maxKey-limits.minKey)*2)
keys, rKeys := exportAllData(t, engine, exportAllQuery)

// Our ElasticCPUWorkHandle will always
// fail. But, we should still make progress,
// one key at a time.
// Our ElasticCPUWorkHandle will always fail. But, we
// should still make progress, one key at a time.
ctx := admission.ContextWithElasticCPUWorkHandle(context.Background(), admission.TestingNewElasticCPUHandleWithCallback(func() (bool, time.Duration) {
return false, 0
return true, 0
}))
assertExportEqualWithOptions(t, ctx, engine, data, MVCCExportOptions{
assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{
StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp},
EndKey: testKey(limits.maxKey),
StartTS: limits.minTimestamp,
EndTS: limits.maxTimestamp,
ExportAllRevisions: true,
StopMidKey: true,
})
})
t.Run("elastic CPU limit exhausted respects StopMidKey",
Expand Down Expand Up @@ -6299,7 +6370,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
// revisions or 0 revisions.
_, _, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile)
require.NoError(t, err)
chunk := sstToKeys(t, sstFile.Bytes())
chunk, _ := sstToKeys(t, sstFile.Bytes())
require.Equal(t, 6, len(chunk))

// With StopMidKey=true, we can stop in the
Expand All @@ -6309,7 +6380,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
opts.StopMidKey = true
_, _, err = MVCCExportToSST(ctx, st, engine, opts, &sstFile)
require.NoError(t, err)
chunk = sstToKeys(t, sstFile.Bytes())
chunk, _ = sstToKeys(t, sstFile.Bytes())
// We expect 3 here rather than 2 because the
// first iteration never calls the handler.
require.Equal(t, 3, len(chunk))
Expand All @@ -6330,14 +6401,17 @@ func testKey(id int64) roachpb.Key {
}

type dataLimits struct {
minKey int64
maxKey int64
minTimestamp hlc.Timestamp
maxTimestamp hlc.Timestamp
tombstoneChance float64
minKey int64
maxKey int64
minTimestamp hlc.Timestamp
maxTimestamp hlc.Timestamp
tombstoneChance float64
useRangeTombstones bool
}

func exportAllData(t *testing.T, engine Engine, limits queryLimits) []MVCCKey {
func exportAllData(
t *testing.T, engine Engine, limits queryLimits,
) ([]MVCCKey, []MVCCRangeKeyStack) {
st := cluster.MakeTestingClusterSettings()
var sstFile bytes.Buffer
_, _, err := MVCCExportToSST(context.Background(), st, engine, MVCCExportOptions{
Expand All @@ -6351,9 +6425,11 @@ func exportAllData(t *testing.T, engine Engine, limits queryLimits) []MVCCKey {
return sstToKeys(t, sstFile.Bytes())
}

func sstToKeys(t *testing.T, data []byte) []MVCCKey {
func sstToKeys(t *testing.T, data []byte) ([]MVCCKey, []MVCCRangeKeyStack) {
var results []MVCCKey
var rangeKeyRes []MVCCRangeKeyStack
it, err := NewMemSSTIterator(data, false, IterOptions{
KeyTypes: pebble.IterKeyTypePointsAndRanges,
LowerBound: keys.MinKey,
UpperBound: keys.MaxKey,
})
Expand All @@ -6365,26 +6441,47 @@ func sstToKeys(t *testing.T, data []byte) []MVCCKey {
if !ok {
break
}

if it.RangeKeyChanged() {
hasPoint, hasRange := it.HasPointAndRange()
if hasRange {
rangeKeyRes = append(rangeKeyRes, it.RangeKeys().Clone())
}
if !hasPoint {
it.Next()
continue
}
}

results = append(results, MVCCKey{
Key: append(roachpb.Key(nil), it.UnsafeKey().Key...),
Timestamp: it.UnsafeKey().Timestamp,
})
it.Next()
}
return results
return results, rangeKeyRes
}

func generateData(t *testing.T, engine Engine, limits dataLimits, totalEntries int64) {
rng := rand.New(rand.NewSource(timeutil.Now().Unix()))
for i := int64(0); i < totalEntries; i++ {
key := testKey(limits.minKey + rand.Int63n(limits.maxKey-limits.minKey))
keyID := limits.minKey + rand.Int63n(limits.maxKey-limits.minKey)
key := testKey(keyID)
timestamp := limits.minTimestamp.Add(rand.Int63n(limits.maxTimestamp.WallTime-limits.minTimestamp.WallTime), 0)
size := 256
if rng.Float64() < limits.tombstoneChance {
size = 0
}
value := MVCCValue{Value: roachpb.MakeValueFromBytes(randutil.RandBytes(rng, size))}
require.NoError(t, engine.PutMVCC(MVCCKey{Key: key, Timestamp: timestamp}, value), "Write data to test storage")

if limits.useRangeTombstones && size == 0 {
require.NoError(t, engine.PutRawMVCCRangeKey(MVCCRangeKey{
StartKey: key,
EndKey: testKey(keyID + 2),
Timestamp: timestamp}, []byte{}), "write data to test storage")
} else {
value := MVCCValue{Value: roachpb.MakeValueFromBytes(randutil.RandBytes(rng, size))}
require.NoError(t, engine.PutMVCC(MVCCKey{Key: key, Timestamp: timestamp}, value), "Write data to test storage")
}
}
require.NoError(t, engine.Flush(), "Flush engine data")
}
Expand Down