Skip to content

Commit

Permalink
[dbnode] Add ToVersion1_1 migration task (#2520)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles authored Aug 12, 2020
1 parent 1b7e6a7 commit fb842c5
Show file tree
Hide file tree
Showing 18 changed files with 1,035 additions and 114 deletions.
33 changes: 30 additions & 3 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -1012,13 +1012,36 @@ func IndexSnapshotFiles(filePathPrefix string, namespace ident.ID) (FileSetFiles

// FileSetAt returns a FileSetFile for the given namespace/shard/blockStart/volume combination if it exists.
func FileSetAt(filePathPrefix string, namespace ident.ID, shard uint32, blockStart time.Time, volume int) (FileSetFile, bool, error) {
var pattern string
// If this is the initial volume, then we need to check if files were written with the legacy file naming (i.e.
// without the volume index) so that we can properly locate the fileset.
if volume == 0 {
dir := ShardDataDirPath(filePathPrefix, namespace, shard)
isLegacy, err := isFirstVolumeLegacy(dir, blockStart, checkpointFileSuffix)
// NB(nate): don't propagate ErrCheckpointFileNotFound here as expectation is to simply return an
// empty FileSetFile if files do not exist.
if err == ErrCheckpointFileNotFound {
return FileSetFile{}, false, nil
} else if err != nil && err != ErrCheckpointFileNotFound {
return FileSetFile{}, false, err
}

if isLegacy {
pattern = filesetFileForTime(blockStart, anyLowerCaseCharsPattern)
}
}

if len(pattern) == 0 {
pattern = filesetFileForTimeAndVolumeIndex(blockStart, volume, anyLowerCaseCharsPattern)
}

matched, err := filesetFiles(filesetFilesSelector{
fileSetType: persist.FileSetFlushType,
contentType: persist.FileSetDataContentType,
filePathPrefix: filePathPrefix,
namespace: namespace,
shard: shard,
pattern: filesetFileForTime(blockStart, anyLowerCaseCharsPattern),
pattern: pattern,
})
if err != nil {
return FileSetFile{}, false, err
Expand Down Expand Up @@ -1615,13 +1638,17 @@ func filesetFileForTime(t time.Time, suffix string) string {
return fmt.Sprintf("%s%s%d%s%s%s", filesetFilePrefix, separator, t.UnixNano(), separator, suffix, fileSuffix)
}

func filesetFileForTimeAndVolumeIndex(t time.Time, index int, suffix string) string {
newSuffix := fmt.Sprintf("%d%s%s", index, separator, suffix)
return filesetFileForTime(t, newSuffix)
}

func filesetPathFromTimeLegacy(prefix string, t time.Time, suffix string) string {
return path.Join(prefix, filesetFileForTime(t, suffix))
}

func filesetPathFromTimeAndIndex(prefix string, t time.Time, index int, suffix string) string {
newSuffix := fmt.Sprintf("%d%s%s", index, separator, suffix)
return path.Join(prefix, filesetFileForTime(t, newSuffix))
return path.Join(prefix, filesetFileForTimeAndVolumeIndex(t, index, suffix))
}

// isFirstVolumeLegacy returns whether the first volume of the provided type is
Expand Down
44 changes: 41 additions & 3 deletions src/dbnode/persist/fs/files_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,38 @@ func TestFileSetAt(t *testing.T) {
}
}

func TestFileSetAtNonLegacy(t *testing.T) {
shard := uint32(0)
numIters := 20
dir := createDataFiles(t, dataDirName, testNs1ID, shard, numIters, true, checkpointFileSuffix)
defer os.RemoveAll(dir)

for i := 0; i < numIters; i++ {
timestamp := time.Unix(0, int64(i))
res, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp, 0)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, timestamp, res.ID.BlockStart)
}
}

func TestFileSetAtNotFirstVolumeIndex(t *testing.T) {
shard := uint32(0)
numIters := 20
volumeIndex := 1
dir := createDataFilesWithVolumeIndex(t, dataDirName, testNs1ID, shard, numIters, true,
checkpointFileSuffix, volumeIndex)
defer os.RemoveAll(dir)

for i := 0; i < numIters; i++ {
timestamp := time.Unix(0, int64(i))
res, ok, err := FileSetAt(dir, testNs1ID, shard, timestamp, volumeIndex)
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, timestamp, res.ID.BlockStart)
}
}

func TestFileSetAtIgnoresWithoutCheckpoint(t *testing.T) {
shard := uint32(0)
numIters := 20
Expand Down Expand Up @@ -1165,8 +1197,8 @@ func createDataCheckpointFiles(t *testing.T, subDirName string, namespace ident.
return createDataFiles(t, subDirName, namespace, shard, iter, isSnapshot, checkpointFileSuffix)
}

func createDataFiles(t *testing.T,
subDirName string, namespace ident.ID, shard uint32, iter int, isSnapshot bool, fileSuffix string,
func createDataFilesWithVolumeIndex(t *testing.T,
subDirName string, namespace ident.ID, shard uint32, iter int, isSnapshot bool, fileSuffix string, volumeIndex int,
) string {
dir := createTempDir(t)
shardDir := path.Join(dir, subDirName, namespace.String(), strconv.Itoa(int(shard)))
Expand All @@ -1175,7 +1207,7 @@ func createDataFiles(t *testing.T,
ts := time.Unix(0, int64(i))
var infoFilePath string
if isSnapshot {
infoFilePath = filesetPathFromTimeAndIndex(shardDir, ts, 0, fileSuffix)
infoFilePath = filesetPathFromTimeAndIndex(shardDir, ts, volumeIndex, fileSuffix)
} else {
infoFilePath = filesetPathFromTimeLegacy(shardDir, ts, fileSuffix)
}
Expand All @@ -1193,6 +1225,12 @@ func createDataFiles(t *testing.T,
return dir
}

func createDataFiles(t *testing.T,
subDirName string, namespace ident.ID, shard uint32, iter int, isSnapshot bool, fileSuffix string,
) string {
return createDataFilesWithVolumeIndex(t, subDirName, namespace, shard, iter, isSnapshot, fileSuffix, 0)
}

type indexFileSetFileIdentifier struct {
FileSetFileIdentifier
Suffix string
Expand Down
31 changes: 31 additions & 0 deletions src/dbnode/persist/fs/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package fs

import (
"errors"
"io"
"time"

Expand All @@ -37,6 +38,8 @@ import (
xtime "github.com/m3db/m3/src/x/time"
)

var errMergeAndCleanupNotSupported = errors.New("function MergeAndCleanup not supported outside of bootstrapping")

type merger struct {
reader DataFileSetReader
blockAllocSize int
Expand All @@ -46,6 +49,7 @@ type merger struct {
encoderPool encoding.EncoderPool
contextPool context.Pool
nsOpts namespace.Options
filePathPrefix string
}

// NewMerger returns a new Merger. This implementation is in charge of merging
Expand All @@ -65,6 +69,7 @@ func NewMerger(
identPool ident.Pool,
encoderPool encoding.EncoderPool,
contextPool context.Pool,
filePathPrefix string,
nsOpts namespace.Options,
) Merger {
return &merger{
Expand All @@ -76,6 +81,7 @@ func NewMerger(
encoderPool: encoderPool,
contextPool: contextPool,
nsOpts: nsOpts,
filePathPrefix: filePathPrefix,
}
}

Expand Down Expand Up @@ -266,6 +272,31 @@ func (m *merger) Merge(
return prepared.DeferClose()
}

func (m *merger) MergeAndCleanup(
fileID FileSetFileIdentifier,
mergeWith MergeWith,
nextVolumeIndex int,
flushPreparer persist.FlushPreparer,
nsCtx namespace.Context,
onFlush persist.OnFlushSeries,
isBootstrapped bool,
) error {
if isBootstrapped {
return errMergeAndCleanupNotSupported
}

close, err := m.Merge(fileID, mergeWith, nextVolumeIndex, flushPreparer, nsCtx, onFlush)
if err != nil {
return err
}

if err = close(); err != nil {
return err
}

return DeleteFileSetAt(m.filePathPrefix, fileID.Namespace, fileID.Shard, fileID.BlockStart, fileID.VolumeIndex)
}

func appendBlockReadersToSegmentReaders(segReaders []xio.SegmentReader, brs []xio.BlockReader) []xio.SegmentReader {
for _, br := range brs {
segReaders = append(segReaders, br.SegmentReader)
Expand Down
103 changes: 102 additions & 1 deletion src/dbnode/persist/fs/merger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ package fs

import (
"io"
"os"
"path/filepath"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/m3db/m3/src/dbnode/digest"
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/namespace"
Expand Down Expand Up @@ -53,6 +56,7 @@ var (
identPool ident.Pool
encoderPool encoding.EncoderPool
contextPool context.Pool
bytesPool pool.CheckedBytesPool

startTime = time.Now().Truncate(blockSize)

Expand Down Expand Up @@ -86,6 +90,10 @@ func init() {
contextPool = context.NewPool(context.NewOptions().
SetContextPoolOptions(poolOpts).
SetFinalizerPoolOptions(poolOpts))
bytesPool = pool.NewCheckedBytesPool(nil, poolOpts, func(s []pool.Bucket) pool.BytesPool {
return pool.NewBytesPool(s, poolOpts)
})
bytesPool.Init()
}

func TestMergeWithIntersection(t *testing.T) {
Expand Down Expand Up @@ -427,6 +435,99 @@ func TestMergeWithNoData(t *testing.T) {
testMergeWith(t, diskData, mergeTargetData, expected)
}

func TestCleanup(t *testing.T) {
dir := createTempDir(t)
filePathPrefix := filepath.Join(dir, "")
defer os.RemoveAll(dir)

// Write fileset to disk
fsOpts := NewOptions().
SetFilePathPrefix(filePathPrefix)

md, err := namespace.NewMetadata(ident.StringID("foo"), namespace.NewOptions())
require.NoError(t, err)

blockStart := time.Now()
var shard uint32 = 1
fsId := FileSetFileIdentifier{
Namespace: md.ID(),
Shard: shard,
BlockStart: blockStart,
VolumeIndex: 0,
}
writeFilesetToDisk(t, fsId, fsOpts)

// Verify fileset exists
exists, err := DataFileSetExists(filePathPrefix, md.ID(), shard, blockStart, 0)
require.NoError(t, err)
require.True(t, exists)

// Initialize merger
reader, err := NewReader(bytesPool, fsOpts)
require.NoError(t, err)

merger := NewMerger(reader, 0, srPool, multiIterPool, identPool, encoderPool, contextPool,
filePathPrefix, namespace.NewOptions())

// Run merger
pm, err := NewPersistManager(fsOpts)
require.NoError(t, err)

preparer, err := pm.StartFlushPersist()
require.NoError(t, err)

err = merger.MergeAndCleanup(fsId, NewNoopMergeWith(), fsId.VolumeIndex+1, preparer,
namespace.NewContextFrom(md), &persist.NoOpColdFlushNamespace{}, false)
require.NoError(t, err)

// Verify old fileset gone and new one present
exists, err = DataFileSetExists(filePathPrefix, md.ID(), shard, blockStart, 0)
require.NoError(t, err)
require.False(t, exists)

exists, err = DataFileSetExists(filePathPrefix, md.ID(), shard, blockStart, 1)
require.NoError(t, err)
require.True(t, exists)
}

func TestCleanupOnceBootstrapped(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

preparer := persist.NewMockFlushPreparer(ctrl)
md, err := namespace.NewMetadata(ident.StringID("foo"), namespace.NewOptions())
require.NoError(t, err)

merger := merger{}
err = merger.MergeAndCleanup(FileSetFileIdentifier{}, NewNoopMergeWith(), 1, preparer,
namespace.NewContextFrom(md), &persist.NoOpColdFlushNamespace{}, true)
require.Error(t, err)
}

func writeFilesetToDisk(t *testing.T, fsId FileSetFileIdentifier, fsOpts Options) {
w, err := NewWriter(fsOpts)
require.NoError(t, err)

writerOpts := DataWriterOpenOptions{
Identifier: fsId,
BlockSize: 2 * time.Hour,
}
err = w.Open(writerOpts)
require.NoError(t, err)

entry := []byte{1, 2, 3}

chkdBytes := checked.NewBytes(entry, nil)
chkdBytes.IncRef()
metadata := persist.NewMetadataFromIDAndTags(ident.StringID("foo"),
ident.Tags{}, persist.MetadataOptions{})
err = w.Write(metadata, chkdBytes, digest.Checksum(entry))
require.NoError(t, err)

err = w.Close()
require.NoError(t, err)
}

func testMergeWith(
t *testing.T,
diskData *checkedBytesMap,
Expand Down Expand Up @@ -464,7 +565,7 @@ func testMergeWith(

nsOpts := namespace.NewOptions()
merger := NewMerger(reader, 0, srPool, multiIterPool,
identPool, encoderPool, contextPool, nsOpts)
identPool, encoderPool, contextPool, NewOptions().FilePathPrefix(), nsOpts)
fsID := FileSetFileIdentifier{
Namespace: ident.StringID("test-ns"),
Shard: uint32(8),
Expand Down
Loading

0 comments on commit fb842c5

Please sign in to comment.