Skip to content

Commit

Permalink
storage: mvccExportToWriter should return always buffered range keys
Browse files Browse the repository at this point in the history
In cockroachdb#96691, we changed the return type of mvccExportToWriter such that
it now indicates when a CPU limit has been reached. As part of that
change, when the CPU limit was reached, we would immedately `return`
rather than `break`ing out of the loop. This introduced a bug, since
the code after the loop that the `break` was taking us to is
important. Namely, we may have previously buffered range keys that we
need to write into our response still. By replacing the break with a
return, these range keys were lost.

The end-user impact of this is that a BACKUP that _ought_ to have
included range keys (such as a backup of a table with a rolled back
IMPORT) would not include those range keys and thus would end up
resurecting deleted keys upon restore.

This PR brings back the `break` and adds a test that covers exporting
with range keys under CPU exhaustion.

This bug only ever existed on master.

Informs cockroachdb#97694

Epic: none

Release note: None
  • Loading branch information
stevendanna committed Mar 9, 2023
1 parent fa6f3f6 commit 74c8915
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 43 deletions.
10 changes: 7 additions & 3 deletions pkg/storage/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6432,7 +6432,7 @@ func mvccExportToWriter(
firstIteration := true
// skipTombstones controls whether we include tombstones.
//
// We want tombstones if we are exporting all reivions or if
// We want tombstones if we are exporting all revisions or if
// we have a StartTS. A non-empty StartTS is used by
// incremental backups and thus needs to see tombstones if
// that happens to be the latest value.
Expand All @@ -6441,7 +6441,10 @@ func mvccExportToWriter(
var rows RowCounter
// Only used if trackKeyBoundary is true.
var curKey roachpb.Key

var resumeKey MVCCKey
var resumeIsCPUOverLimit bool

var rangeKeys MVCCRangeKeyStack
var rangeKeysSize int64

Expand Down Expand Up @@ -6513,7 +6516,8 @@ func mvccExportToWriter(
if isNewKey {
resumeKey.Timestamp = hlc.Timestamp{}
}
return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey, CPUOverlimit: true}, nil
resumeIsCPUOverLimit = true
break
}
}

Expand Down Expand Up @@ -6722,7 +6726,7 @@ func mvccExportToWriter(
rows.BulkOpSummary.DataSize += rangeKeysSize
}

return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey}, nil
return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey, CPUOverlimit: resumeIsCPUOverLimit}, nil
}

// MVCCExportOptions contains options for MVCCExportToSST.
Expand Down
177 changes: 137 additions & 40 deletions pkg/storage/mvcc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble"
"github.com/kr/pretty"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -6141,6 +6142,10 @@ func TestWillOverflow(t *testing.T) {
// in which mis-handling of resume spans would cause MVCCExportToSST
// to return an empty resume key in cases where the resource limiters
// caused an early return of a resume span.
//
// NB: That this test treats the result of MVCCExportToSST _without_
// CPU rate limiting as the truth. Bugs that affect all exports will
// not be caught by this test.
func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand All @@ -6152,26 +6157,74 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
maxKey = int64(1000)
minTimestamp = hlc.Timestamp{WallTime: 100000}
maxTimestamp = hlc.Timestamp{WallTime: 200000}

exportAllQuery = queryLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
latest: false,
}
)

assertExportEqualWithOptions := func(t *testing.T, ctx context.Context, engine Engine, expectedData []MVCCKey, initialOpts MVCCExportOptions) {
dataIndex := 0
// When ExportRequest is interrupted by the CPU limiter, the currently
// buffered range key stack will have its EndKey truncated to the resume
// key. To account for this, we write all of the range keys back into a
// store and then export them out again without interruption.
canonicalizeRangeKeys := func(in []MVCCRangeKeyStack) []MVCCRangeKeyStack {
if len(in) == 0 {
return in
}

engine := createTestPebbleEngine()
defer engine.Close()
for _, keyStack := range in {
for _, version := range keyStack.Versions {
require.NoError(t, engine.PutRawMVCCRangeKey(keyStack.AsRangeKey(version), []byte{}))
}
}
require.NoError(t, engine.Flush())
keys, rKeys := exportAllData(t, engine, exportAllQuery)
require.Equal(t, 0, len(keys))
return rKeys
}

assertExportEqualWithOptions := func(t *testing.T, ctx context.Context, engine Engine,
expectedKeys []MVCCKey,
expectedRangeKeys []MVCCRangeKeyStack,
initialOpts MVCCExportOptions) {

keysIndex := 0
rKeysBuf := []MVCCRangeKeyStack{}

startKey := initialOpts.StartKey
for len(startKey.Key) > 0 {
var sstFile bytes.Buffer
opts := initialOpts
opts.StartKey = startKey
_, resumeInfo, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile)
require.NoError(t, err)
chunk := sstToKeys(t, sstFile.Bytes())
require.LessOrEqual(t, len(chunk), len(expectedData)-dataIndex, "remaining test data")
for _, key := range chunk {
require.True(t, key.Equal(expectedData[dataIndex]), "returned key is not equal")
dataIndex++

keys, rangeKeys := sstToKeys(t, sstFile.Bytes())

require.LessOrEqual(t, len(keys), len(expectedKeys)-keysIndex, "remaining test key data")

for _, key := range keys {
require.True(t, key.Equal(expectedKeys[keysIndex]), "returned key is not equal")
keysIndex++
}
rKeysBuf = append(rKeysBuf, rangeKeys...)
startKey = resumeInfo.ResumeKey
}
require.Equal(t, len(expectedData), dataIndex, "not all expected data was consumed")
require.Equal(t, len(expectedKeys), keysIndex, "not all expected keys were consumed")

actualRangeKeys := canonicalizeRangeKeys(rKeysBuf)
require.Equal(t, len(expectedRangeKeys), len(actualRangeKeys))
for i, actual := range actualRangeKeys {
expected := expectedRangeKeys[i]
require.True(t, actual.Equal(expected), "range key mismatch %v != %v", actual, expected)
}

}
t.Run("elastic CPU limit exhausted",
func(t *testing.T) {
Expand All @@ -6186,13 +6239,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
tombstoneChance: 0.01,
}
generateData(t, engine, limits, (limits.maxKey-limits.minKey)*10)
data := exportAllData(t, engine, queryLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
latest: false,
})
keys, rKeys := exportAllData(t, engine, exportAllQuery)

// Our ElasticCPUWorkHandle will fail on the very first call. As a result,
// the very first return from MVCCExportToSST will actually contain no
Expand All @@ -6205,7 +6252,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
}
return false, 0
}))
assertExportEqualWithOptions(t, ctx, engine, data, MVCCExportOptions{
assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{
StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp},
EndKey: testKey(limits.maxKey),
StartTS: limits.minTimestamp,
Expand All @@ -6227,26 +6274,50 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
tombstoneChance: 0.01,
}
generateData(t, engine, limits, (limits.maxKey-limits.minKey)*10)
data := exportAllData(t, engine, queryLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
latest: false,
keys, rKeys := exportAllData(t, engine, exportAllQuery)

// Our ElasticCPUWorkHandle will always fail. But, we
// should still make progress, one key at a time.
ctx := admission.ContextWithElasticCPUWorkHandle(context.Background(), admission.TestingNewElasticCPUHandleWithCallback(func() (bool, time.Duration) {
return true, 0
}))
assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{
StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp},
EndKey: testKey(limits.maxKey),
StartTS: limits.minTimestamp,
EndTS: limits.maxTimestamp,
ExportAllRevisions: true,
})
})
t.Run("elastic CPU limit always exhausted with range keys",
func(t *testing.T) {
engine := createTestPebbleEngine()
defer engine.Close()
limits := dataLimits{
minKey: minKey,
maxKey: maxKey,
minTimestamp: minTimestamp,
maxTimestamp: maxTimestamp,
tombstoneChance: 0.50,
useRangeTombstones: true,
}
// Adding many range keys makes this test much slower,
// so we use 2*keyRange rather than 10*keyRange here.
generateData(t, engine, limits, (limits.maxKey-limits.minKey)*2)
keys, rKeys := exportAllData(t, engine, exportAllQuery)

// Our ElasticCPUWorkHandle will always
// fail. But, we should still make progress,
// one key at a time.
// Our ElasticCPUWorkHandle will always fail. But, we
// should still make progress, one key at a time.
ctx := admission.ContextWithElasticCPUWorkHandle(context.Background(), admission.TestingNewElasticCPUHandleWithCallback(func() (bool, time.Duration) {
return false, 0
return true, 0
}))
assertExportEqualWithOptions(t, ctx, engine, data, MVCCExportOptions{
assertExportEqualWithOptions(t, ctx, engine, keys, rKeys, MVCCExportOptions{
StartKey: MVCCKey{Key: testKey(limits.minKey), Timestamp: limits.minTimestamp},
EndKey: testKey(limits.maxKey),
StartTS: limits.minTimestamp,
EndTS: limits.maxTimestamp,
ExportAllRevisions: true,
StopMidKey: true,
})
})
t.Run("elastic CPU limit exhausted respects StopMidKey",
Expand Down Expand Up @@ -6299,7 +6370,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
// revisions or 0 revisions.
_, _, err := MVCCExportToSST(ctx, st, engine, opts, &sstFile)
require.NoError(t, err)
chunk := sstToKeys(t, sstFile.Bytes())
chunk, _ := sstToKeys(t, sstFile.Bytes())
require.Equal(t, 6, len(chunk))

// With StopMidKey=true, we can stop in the
Expand All @@ -6309,7 +6380,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) {
opts.StopMidKey = true
_, _, err = MVCCExportToSST(ctx, st, engine, opts, &sstFile)
require.NoError(t, err)
chunk = sstToKeys(t, sstFile.Bytes())
chunk, _ = sstToKeys(t, sstFile.Bytes())
// We expect 3 here rather than 2 because the
// first iteration never calls the handler.
require.Equal(t, 3, len(chunk))
Expand All @@ -6330,14 +6401,17 @@ func testKey(id int64) roachpb.Key {
}

type dataLimits struct {
minKey int64
maxKey int64
minTimestamp hlc.Timestamp
maxTimestamp hlc.Timestamp
tombstoneChance float64
minKey int64
maxKey int64
minTimestamp hlc.Timestamp
maxTimestamp hlc.Timestamp
tombstoneChance float64
useRangeTombstones bool
}

func exportAllData(t *testing.T, engine Engine, limits queryLimits) []MVCCKey {
func exportAllData(
t *testing.T, engine Engine, limits queryLimits,
) ([]MVCCKey, []MVCCRangeKeyStack) {
st := cluster.MakeTestingClusterSettings()
var sstFile bytes.Buffer
_, _, err := MVCCExportToSST(context.Background(), st, engine, MVCCExportOptions{
Expand All @@ -6351,9 +6425,11 @@ func exportAllData(t *testing.T, engine Engine, limits queryLimits) []MVCCKey {
return sstToKeys(t, sstFile.Bytes())
}

func sstToKeys(t *testing.T, data []byte) []MVCCKey {
func sstToKeys(t *testing.T, data []byte) ([]MVCCKey, []MVCCRangeKeyStack) {
var results []MVCCKey
var rangeKeyRes []MVCCRangeKeyStack
it, err := NewMemSSTIterator(data, false, IterOptions{
KeyTypes: pebble.IterKeyTypePointsAndRanges,
LowerBound: keys.MinKey,
UpperBound: keys.MaxKey,
})
Expand All @@ -6365,26 +6441,47 @@ func sstToKeys(t *testing.T, data []byte) []MVCCKey {
if !ok {
break
}

if it.RangeKeyChanged() {
hasPoint, hasRange := it.HasPointAndRange()
if hasRange {
rangeKeyRes = append(rangeKeyRes, it.RangeKeys().Clone())
}
if !hasPoint {
it.Next()
continue
}
}

results = append(results, MVCCKey{
Key: append(roachpb.Key(nil), it.UnsafeKey().Key...),
Timestamp: it.UnsafeKey().Timestamp,
})
it.Next()
}
return results
return results, rangeKeyRes
}

func generateData(t *testing.T, engine Engine, limits dataLimits, totalEntries int64) {
rng := rand.New(rand.NewSource(timeutil.Now().Unix()))
for i := int64(0); i < totalEntries; i++ {
key := testKey(limits.minKey + rand.Int63n(limits.maxKey-limits.minKey))
keyID := limits.minKey + rand.Int63n(limits.maxKey-limits.minKey)
key := testKey(keyID)
timestamp := limits.minTimestamp.Add(rand.Int63n(limits.maxTimestamp.WallTime-limits.minTimestamp.WallTime), 0)
size := 256
if rng.Float64() < limits.tombstoneChance {
size = 0
}
value := MVCCValue{Value: roachpb.MakeValueFromBytes(randutil.RandBytes(rng, size))}
require.NoError(t, engine.PutMVCC(MVCCKey{Key: key, Timestamp: timestamp}, value), "Write data to test storage")

if limits.useRangeTombstones && size == 0 {
require.NoError(t, engine.PutRawMVCCRangeKey(MVCCRangeKey{
StartKey: key,
EndKey: testKey(keyID + 2),
Timestamp: timestamp}, []byte{}), "write data to test storage")
} else {
value := MVCCValue{Value: roachpb.MakeValueFromBytes(randutil.RandBytes(rng, size))}
require.NoError(t, engine.PutMVCC(MVCCKey{Key: key, Timestamp: timestamp}, value), "Write data to test storage")
}
}
require.NoError(t, engine.Flush(), "Flush engine data")
}
Expand Down

0 comments on commit 74c8915

Please sign in to comment.