Skip to content

Commit

Permalink
[dbnode] Add Migrator for performing migrations and wire into fs boot…
Browse files Browse the repository at this point in the history
…strapper (#2521)
  • Loading branch information
nbroyles authored Sep 3, 2020
1 parent 4bccb6c commit c036ebf
Show file tree
Hide file tree
Showing 17 changed files with 946 additions and 30 deletions.
3 changes: 2 additions & 1 deletion src/cmd/services/m3dbnode/config/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,8 @@ func (bsc BootstrapConfiguration) New(
SetBoostrapDataNumProcessors(fsCfg.numCPUs()).
SetRuntimeOptionsManager(opts.RuntimeOptionsManager()).
SetIdentifierPool(opts.IdentifierPool()).
SetMigrationOptions(fsCfg.migration().NewOptions())
SetMigrationOptions(fsCfg.migration().NewOptions()).
SetStorageOptions(opts)
if err := validator.ValidateFilesystemBootstrapperOptions(fsbOpts); err != nil {
return nil, err
}
Expand Down
15 changes: 15 additions & 0 deletions src/dbnode/persist/fs/files.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,21 @@ type SnapshotMetadataIdentifier struct {
UUID uuid.UUID
}

// NewFileSetFileIdentifier creates a new FileSetFileIdentifier.
func NewFileSetFileIdentifier(
namespace ident.ID,
blockStart time.Time,
shard uint32,
volumeIndex int,
) FileSetFileIdentifier {
return FileSetFileIdentifier{
Namespace: namespace,
Shard: shard,
BlockStart: blockStart,
VolumeIndex: volumeIndex,
}
}

// NewFileSetFile creates a new FileSet file
func NewFileSetFile(id FileSetFileIdentifier, filePathPrefix string) FileSetFile {
return FileSetFile{
Expand Down
27 changes: 20 additions & 7 deletions src/dbnode/persist/fs/migration/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import (
// on a fileset. This typically involves updating files in a fileset that were created by
// a previous version of the database client.
type Task interface {
// Run is the set of steps to successfully complete a migration.
Run() error
// Run is the set of steps to successfully complete a migration. Returns the potentially
// updated ReadInfoFileResult or an error.
Run() (fs.ReadInfoFileResult, error)
}

// NewTaskFn is a function that can create a new migration task.
Expand Down Expand Up @@ -63,7 +64,7 @@ func NewToVersion1_1Task(opts TaskOptions) (Task, error) {
}

// Run executes the steps to bring a fileset to Version 1.1.
func (v *toVersion1_1Task) Run() error {
func (v *toVersion1_1Task) Run() (fs.ReadInfoFileResult, error) {
var (
sOpts = v.opts.StorageOptions()
fsOpts = v.opts.FilesystemOptions()
Expand All @@ -75,7 +76,7 @@ func (v *toVersion1_1Task) Run() error {
)
reader, err := fs.NewReader(sOpts.BytesPool(), fsOpts)
if err != nil {
return err
return infoFileResult, err
}

merger := newMergerFn(reader, sOpts.DatabaseBlockOptions().DatabaseBlockAllocSize(),
Expand All @@ -95,11 +96,23 @@ func (v *toVersion1_1Task) Run() error {

flushPersist, err := persistManager.StartFlushPersist()
if err != nil {
return err
return infoFileResult, err
}

// Intentionally use a noop merger here as we simply want to rewrite the same files with the current encoder which
// will generate index files with the entry level checksums.
return merger.MergeAndCleanup(fsID, fs.NewNoopMergeWith(), volIndex+1, flushPersist, nsCtx,
&persist.NoOpColdFlushNamespace{}, false)
newIndex := volIndex + 1
if err = merger.MergeAndCleanup(fsID, fs.NewNoopMergeWith(), newIndex, flushPersist, nsCtx,
&persist.NoOpColdFlushNamespace{}, false); err != nil {
return infoFileResult, err
}

if err = flushPersist.DoneFlush(); err != nil {
return infoFileResult, err
}

infoFileResult.Info.VolumeIndex = newIndex
infoFileResult.Info.MinorVersion = 1

return infoFileResult, nil
}
37 changes: 29 additions & 8 deletions src/dbnode/persist/fs/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ func TestToVersion1_1Run(t *testing.T) {
fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions(), persist.FileSetFlushType)
require.Equal(t, 1, len(results))
infoFileResult := results[0]
indexFd, err := os.Open(path.Join(fsOpts.FilePathPrefix(), fmt.Sprintf("data/%s/%d/fileset-%d-0-index.db",
nsId.String(), shard, infoFileResult.Info.BlockStart)))
require.NoError(t, err)
indexFd := openFile(t, fsOpts, nsId, shard, infoFileResult, "index")
oldBytes, err := ioutil.ReadAll(indexFd)
require.NoError(t, err)

Expand Down Expand Up @@ -97,28 +95,51 @@ func TestToVersion1_1Run(t *testing.T) {
task, err := NewToVersion1_1Task(opts)
require.NoError(t, err)

err = task.Run()
updatedInfoFile, err := task.Run()
require.NoError(t, err)

// Read the index entries of new volume set
indexFd, err = os.Open(path.Join(fsOpts.FilePathPrefix(), fmt.Sprintf("data/%s/%d/fileset-%d-1-index.db",
nsId.String(), shard, infoFileResult.Info.BlockStart)))
// Read new info file and make sure it matches results returned by task
newInfoFd := openFile(t, fsOpts, nsId, shard, updatedInfoFile, "info")

newInfoBytes, err := ioutil.ReadAll(newInfoFd)
require.NoError(t, err)

decoder := msgpack.NewDecoder(nil)
decoder.Reset(msgpack.NewByteDecoderStream(newInfoBytes))
info, err := decoder.DecodeIndexInfo()

require.Equal(t, updatedInfoFile.Info, info)

// Read the index entries of new volume set
indexFd = openFile(t, fsOpts, nsId, shard, updatedInfoFile, "index")
newBytes, err := ioutil.ReadAll(indexFd)
require.NoError(t, err)

// Diff bytes of unmigrated vs migrated fileset
require.NotEqual(t, oldBytes, newBytes)

// Corrupt bytes to trip newly added checksum
decoder := msgpack.NewDecoder(nil)
newBytes[len(newBytes)-1] = 1 + newBytes[len(newBytes)-1]
decoder.Reset(msgpack.NewByteDecoderStream(newBytes))
_, err = decoder.DecodeIndexEntry(nil)
require.Error(t, err)
require.Contains(t, err.Error(), "checksum mismatch")
}

func openFile(
t *testing.T,
fsOpts fs.Options,
nsId ident.ID,
shard uint32,
infoFileResult fs.ReadInfoFileResult,
fileType string,
) *os.File {
indexFd, err := os.Open(path.Join(fsOpts.FilePathPrefix(), fmt.Sprintf("data/%s/%d/fileset-%d-%d-%s.db",
nsId.String(), shard, infoFileResult.Info.BlockStart, infoFileResult.Info.VolumeIndex, fileType)))
require.NoError(t, err)
return indexFd
}

func writeUnmigratedData(t *testing.T, filePathPrefix string, nsId ident.ID, shard uint32) fs.Options {
// Use encoding options that will not generate entry level checksums
eOpts := msgpack.LegacyEncodingOptions{EncodeLegacyIndexEntryVersion: msgpack.LegacyEncodingIndexEntryVersionV2}
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/migration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ package migration

import (
"fmt"
"math"
"runtime"
)

// defaultMigrationConcurrency is the default number of concurrent workers to perform migrations.
var defaultMigrationConcurrency = runtime.NumCPU()
var defaultMigrationConcurrency = int(math.Ceil(float64(runtime.NumCPU()) / 2))

type options struct {
targetMigrationVersion MigrationVersion
Expand Down
6 changes: 6 additions & 0 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,3 +622,9 @@ type Segments interface {
AbsoluteFilePaths() []string
BlockStart() time.Time
}

// InfoFileResultsPerShard maps shards to info files.
type InfoFileResultsPerShard map[uint32][]ReadInfoFileResult

// InfoFilesByNamespace maps a namespace to info files grouped by shard.
type InfoFilesByNamespace map[namespace.Metadata]InfoFileResultsPerShard
Loading

0 comments on commit c036ebf

Please sign in to comment.