Skip to content

Commit

Permalink
[dbnode] No empty TSDB snapshots (#2666)
Browse files Browse the repository at this point in the history
  • Loading branch information
notbdu authored Oct 1, 2020
1 parent 06cca59 commit d5fbe4b
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 169 deletions.
51 changes: 39 additions & 12 deletions src/dbnode/integration/disk_flush_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type snapshotID struct {
}

func getLatestSnapshotVolumeIndex(
filePathPrefix string,
fsOpts fs.Options,
shardSet sharding.ShardSet,
namespace ident.ID,
blockStart time.Time,
Expand All @@ -63,7 +63,7 @@ func getLatestSnapshotVolumeIndex(

for _, shard := range shardSet.AllIDs() {
snapshotFiles, err := fs.SnapshotFiles(
filePathPrefix, namespace, shard)
fsOpts.FilePathPrefix(), namespace, shard)
if err != nil {
panic(err)
}
Expand All @@ -80,42 +80,69 @@ func getLatestSnapshotVolumeIndex(
}

func waitUntilSnapshotFilesFlushed(
filePathPrefix string,
fsOpts fs.Options,
shardSet sharding.ShardSet,
namespace ident.ID,
expectedSnapshots []snapshotID,
timeout time.Duration,
) (uuid.UUID, error) {
var snapshotID uuid.UUID
dataFlushed := func() bool {
// NB(bodu): We want to ensure that we have snapshot data that is consistent across
// ALL shards on a per block start basis. For each snapshot block start, we expect
// the data to exist in at least one shard.
expectedSnapshotsSeen := make([]bool, len(expectedSnapshots))
for _, shard := range shardSet.AllIDs() {
for _, e := range expectedSnapshots {
for i, e := range expectedSnapshots {
snapshotFiles, err := fs.SnapshotFiles(
filePathPrefix, namespace, shard)
fsOpts.FilePathPrefix(), namespace, shard)
if err != nil {
panic(err)
}

latest, ok := snapshotFiles.LatestVolumeForBlock(e.blockStart)
if !ok {
return false
// Each shard may not own data for all block starts.
continue
}

if !(latest.ID.VolumeIndex >= e.minVolume) {
return false
// Cleanup manager can lag behind.
continue
}

_, snapshotID, err = latest.SnapshotTimeAndID()
if err != nil {
panic(err)
}
// Mark expected snapshot as seen.
expectedSnapshotsSeen[i] = true
}
}
// We should have seen each expected snapshot in at least one shard.
for _, maybeSeen := range expectedSnapshotsSeen {
if !maybeSeen {
return false
}
}
return true
}
if waitUntil(dataFlushed, timeout) {
return snapshotID, nil
// Use snapshot metadata to get latest snapshotID as the view of snapshotID can be inconsistent
// across TSDB blocks.
snapshotMetadataFlushed := func() bool {
snapshotMetadatas, _, err := fs.SortedSnapshotMetadataFiles(fsOpts)
if err != nil {
panic(err)
}

if len(snapshotMetadatas) == 0 {
return false
}
snapshotID = snapshotMetadatas[len(snapshotMetadatas)-1].ID.UUID
return true
}
if waitUntil(snapshotMetadataFlushed, timeout) {
return snapshotID, nil
}
}

return snapshotID, errDiskFlushTimedOut
}

Expand Down
54 changes: 34 additions & 20 deletions src/dbnode/integration/disk_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
xtime "github.com/m3db/m3/src/x/time"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestDiskSnapshotSimple(t *testing.T) {
Expand Down Expand Up @@ -99,9 +100,9 @@ func TestDiskSnapshotSimple(t *testing.T) {
// Writes in the previous block which should be mutable due to bufferPast
{IDs: []string{"foo", "bar", "baz"}, NumPoints: 5, Start: currBlock.Add(-10 * time.Minute)},
// Writes in the current block
{IDs: []string{"a", "b", "c"}, NumPoints: 30, Start: currBlock},
{IDs: []string{"a", "b", "c"}, NumPoints: 5, Start: currBlock},
// Writes in the next block which should be mutable due to bufferFuture
{IDs: []string{"1", "2", "3"}, NumPoints: 30, Start: currBlock.Add(blockSize)},
{IDs: []string{"1", "2", "3"}, NumPoints: 5, Start: currBlock.Add(blockSize)},
}
)
for _, input := range inputData {
Expand All @@ -120,27 +121,26 @@ func TestDiskSnapshotSimple(t *testing.T) {
// the measured volume index + 1.
var (
snapshotsToWaitForByNS = make([][]snapshotID, 0, len(testSetup.Namespaces()))
filePathPrefix = testSetup.StorageOpts().
fsOpts = testSetup.StorageOpts().
CommitLogOptions().
FilesystemOptions().
FilePathPrefix()
FilesystemOptions()
)
for _, ns := range testSetup.Namespaces() {
snapshotsToWaitForByNS = append(snapshotsToWaitForByNS, []snapshotID{
{
blockStart: currBlock.Add(-blockSize),
minVolume: getLatestSnapshotVolumeIndex(
filePathPrefix, shardSet, ns.ID(), currBlock.Add(-blockSize)) + 1,
fsOpts, shardSet, ns.ID(), currBlock.Add(-blockSize)),
},
{
blockStart: currBlock,
minVolume: getLatestSnapshotVolumeIndex(
filePathPrefix, shardSet, ns.ID(), currBlock) + 1,
fsOpts, shardSet, ns.ID(), currBlock),
},
{
blockStart: currBlock.Add(blockSize),
minVolume: getLatestSnapshotVolumeIndex(
filePathPrefix, shardSet, ns.ID(), currBlock.Add(blockSize)) + 1,
fsOpts, shardSet, ns.ID(), currBlock.Add(blockSize)),
},
})
}
Expand All @@ -151,35 +151,49 @@ func TestDiskSnapshotSimple(t *testing.T) {

maxWaitTime := time.Minute
for i, ns := range testSetup.Namespaces() {
log.Info("waiting for snapshot files to flush")
_, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)
log.Info("waiting for snapshot files to flush",
zap.Any("ns", ns.ID()))
_, err := waitUntilSnapshotFilesFlushed(fsOpts, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)
require.NoError(t, err)
log.Info("verifying snapshot files")
log.Info("verifying snapshot files",
zap.Any("ns", ns.ID()))
verifySnapshottedDataFiles(t, shardSet, testSetup.StorageOpts(), ns.ID(), seriesMaps)
}

var (
oldTime = testSetup.NowFn()()
newTime = oldTime.Add(blockSize * 2)
newTime = testSetup.NowFn()().Add(blockSize * 2)
)
testSetup.SetNowFn(newTime)

for _, ns := range testSetup.Namespaces() {
log.Info("waiting for new snapshot files to be written out")
snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}}
log.Info("waiting for new snapshot files to be written out",
zap.Any("ns", ns.ID()))
snapshotsToWaitFor := []snapshotID{{blockStart: currBlock.Add(blockSize)}}
// NB(bodu): We need to check if a specific snapshot ID was deleted since snapshotting logic now changed
// to always snapshotting every block start w/in retention.
snapshotID, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)
snapshotID, err := waitUntilSnapshotFilesFlushed(fsOpts, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)
require.NoError(t, err)
log.Info("waiting for old snapshot files to be deleted")
log.Info("waiting for old snapshot files to be deleted",
zap.Any("ns", ns.ID()))
// These should be flushed to disk and snapshots should have been cleaned up.
flushedBlockStarts := []time.Time{
currBlock.Add(-blockSize),
currBlock,
}
for _, shard := range shardSet.All() {
waitUntil(func() bool {
// Increase the time each check to ensure that the filesystem processes are able to progress (some
// of them throttle themselves based on time elapsed since the previous time.)
testSetup.SetNowFn(testSetup.NowFn()().Add(10 * time.Second))
exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), snapshotID, shard.ID(), oldTime.Truncate(blockSize))
require.NoError(t, err)
return !exists
// Ensure that snapshots for flushed data blocks no longer exist.
for _, blockStart := range flushedBlockStarts {
exists, err := fs.SnapshotFileSetExistsAt(fsOpts.FilePathPrefix(), ns.ID(), snapshotID, shard.ID(), blockStart)
require.NoError(t, err)
if exists {
return false
}
}
return true
}, maxWaitTime)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) {
latestToCheck = datapoints[len(datapoints)-1].time.Add(ns1BlockSize)
timesToRestart = []time.Time{}
start = earliestToCheck
filePathPrefix = setup.StorageOpts().CommitLogOptions().FilesystemOptions().FilePathPrefix()
fsOpts = setup.StorageOpts().CommitLogOptions().FilesystemOptions()
filePathPrefix = fsOpts.FilePathPrefix()
)

// Generate randomly selected times during which the node will restart
Expand All @@ -169,7 +170,10 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) {
defer ctx.Close()

log.Info("writing datapoints")
var i int
var (
i int
snapshotBlocks = map[xtime.UnixNano]struct{}{}
)
for i = lastDatapointsIdx; i < len(datapoints); i++ {
var (
dp = datapoints[i]
Expand All @@ -186,6 +190,7 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) {
log.Warn("error writing series datapoint", zap.Error(err))
return false, err
}
snapshotBlocks[xtime.ToUnixNano(ts.Truncate(ns1BlockSize))] = struct{}{}
}
lastDatapointsIdx = i
log.Info("wrote datapoints")
Expand Down Expand Up @@ -219,20 +224,22 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) {
}
}

if input.waitForSnapshotFiles {
// We've written data if we've advanced the datapoints index.
dpsWritten := i > 0
if input.waitForSnapshotFiles && dpsWritten {
log.Info("waiting for snapshot files to be written")
now := setup.NowFn()()
var snapshotBlock time.Time
if now.Add(-bufferPast).Truncate(ns1BlockSize).Equal(now.Truncate(ns1BlockSize)) {
snapshotBlock = now.Truncate(ns1BlockSize)
} else {
snapshotBlock = now.Truncate(ns1BlockSize).Add(-ns1BlockSize)
// We only snapshot TSDB blocks that have data in them.
expectedSnapshotBlocks := make([]snapshotID, 0, len(snapshotBlocks))
for snapshotBlock := range snapshotBlocks {
expectedSnapshotBlocks = append(expectedSnapshotBlocks, snapshotID{
blockStart: snapshotBlock.ToTime(),
})
}
_, err := waitUntilSnapshotFilesFlushed(
filePathPrefix,
fsOpts,
setup.ShardSet(),
nsID,
[]snapshotID{{blockStart: snapshotBlock}},
expectedSnapshotBlocks,
maxFlushWaitTime,
)
if err != nil {
Expand Down
Loading

0 comments on commit d5fbe4b

Please sign in to comment.