Skip to content

Commit

Permalink
[dbnode] Add validation to ensure index claims is singleton (#2877)
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed Nov 12, 2020
1 parent a0a96a7 commit 3746bf0
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/stretchr/testify/require"
)

func TestDiskCleansupInactiveDirectories(t *testing.T) {
func TestDiskCleanupInactiveDirectories(t *testing.T) {
var resetSetup TestSetup
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
Expand Down
4 changes: 1 addition & 3 deletions src/dbnode/integration/disk_cleanup_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,7 @@ import (
"github.com/stretchr/testify/require"
)

var (
errDataCleanupTimedOut = errors.New("cleaning up data files took too long")
)
var errDataCleanupTimedOut = errors.New("cleaning up data files took too long")

func newDataFileSetWriter(storageOpts storage.Options) (fs.DataFileSetWriter, error) {
fsOpts := storageOpts.CommitLogOptions().FilesystemOptions()
Expand Down
9 changes: 7 additions & 2 deletions src/dbnode/integration/integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ func newDefaultBootstrappableTestSetups(
SetClusterDatabaseTopologyInitializer(topologyInitializer).
SetUseTChannelClientForWriting(useTChannelClientForWriting)

if i > 0 {
// NB(bodu): Need to reset the global counter of number of index
// claim manager instances after the initial node.
persistfs.ResetIndexClaimsManagersUnsafe()
}
setup, err := NewTestSetup(t, instanceOpts, nil)
require.NoError(t, err)
topologyInitializer = setup.TopologyInitializer()
Expand Down Expand Up @@ -286,14 +291,14 @@ func newDefaultBootstrappableTestSetups(

persistMgr, err := persistfs.NewPersistManager(fsOpts)
require.NoError(t, err)
icm := persistfs.NewIndexClaimsManager(fsOpts)

bfsOpts := bfs.NewOptions().
SetResultOptions(bsOpts).
SetFilesystemOptions(fsOpts).
SetIndexOptions(storageIdxOpts).
SetCompactor(newCompactor(t, storageIdxOpts)).
SetPersistManager(persistMgr).
SetIndexClaimsManager(icm)
SetIndexClaimsManager(setup.StorageOpts().IndexClaimsManager())

fsBootstrapper, err := bfs.NewFileSystemBootstrapperProvider(bfsOpts, finalBootstrapper)
require.NoError(t, err)
Expand Down
17 changes: 14 additions & 3 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,10 @@ func NewTestSetup(
storageOpts = storageOpts.SetPersistManager(pm)

// Set up index claims manager
icm := fs.NewIndexClaimsManager(fsOpts)
icm, err := fs.NewIndexClaimsManager(fsOpts)
if err != nil {
return nil, err
}
storageOpts = storageOpts.SetIndexClaimsManager(icm)

// Set up repair options
Expand Down Expand Up @@ -536,6 +539,7 @@ func guessBestTruncateBlockSize(mds []namespace.Metadata) (time.Duration, bool)
// otherwise, we are guessing
return guess, true
}

func (ts *testSetup) ShouldBeEqual() bool {
return ts.assertEqual == nil
}
Expand Down Expand Up @@ -761,6 +765,10 @@ func (ts *testSetup) startServerBase(waitForBootstrap bool) error {
func (ts *testSetup) StopServer() error {
ts.doneCh <- struct{}{}

// NB(bodu): Need to reset the global counter of index claims managers after
// we've stopped the test server. This covers the restart server case.
fs.ResetIndexClaimsManagersUnsafe()

if ts.m3dbClient.DefaultSessionActive() {
session, err := ts.m3dbClient.DefaultSession()
if err != nil {
Expand Down Expand Up @@ -819,6 +827,10 @@ func (ts *testSetup) Close() {
if ts.filePathPrefix != "" {
os.RemoveAll(ts.filePathPrefix)
}

// This could get called more than once in the multi node integration test case
// but this is fine since the reset always sets the counter to 0.
fs.ResetIndexClaimsManagersUnsafe()
}

func (ts *testSetup) MustSetTickMinimumInterval(tickMinInterval time.Duration) {
Expand Down Expand Up @@ -936,7 +948,6 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions
if err != nil {
return err
}
icm := fs.NewIndexClaimsManager(fsOpts)
storageIdxOpts := storageOpts.IndexOptions()
compactor, err := newCompactorWithErr(storageIdxOpts)
if err != nil {
Expand All @@ -947,7 +958,7 @@ func (ts *testSetup) InitializeBootstrappers(opts InitializeBootstrappersOptions
SetFilesystemOptions(fsOpts).
SetIndexOptions(storageIdxOpts).
SetPersistManager(persistMgr).
SetIndexClaimsManager(icm).
SetIndexClaimsManager(storageOpts.IndexClaimsManager()).
SetCompactor(compactor)
bs, err = bfs.NewFileSystemBootstrapperProvider(bfsOpts, bs)
if err != nil {
Expand Down
37 changes: 34 additions & 3 deletions src/dbnode/persist/fs/index_claims_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,38 @@ package fs
import (
"errors"
"sync"
"sync/atomic"
"time"

"go.uber.org/zap"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"
)

var errOutOfRetentionClaim = errors.New("out of retention index volume claim")
var (
// errMustUseSingleClaimsManager returned when a second claims manager
// created, since this is a violation of expected behavior.
errMustUseSingleClaimsManager = errors.New("not using single global claims manager")
// errOutOfRetentionClaim returned when reserving a claim that is
// out of retention.
errOutOfRetentionClaim = errors.New("out of retention index volume claim")

globalIndexClaimsManagers uint64
)

// ResetIndexClaimsManagersUnsafe should only be used from tests or integration
// tests, it resets the count of index claim managers to allow new claim
// managers to be created.
// By default this is restricted to just once instantiation since otherwise
// concurrency issues can be skipped without realizing.
func ResetIndexClaimsManagersUnsafe() {
atomic.StoreUint64(&globalIndexClaimsManagers, 0)
}

type indexClaimsManager struct {
sync.Mutex
Expand All @@ -53,13 +75,22 @@ type volumeIndexClaim struct {
// concurrent claims for volume indices per ns and block start.
// NB(bodu): There should be only a single shared index claim manager among all threads
// writing index data filesets.
func NewIndexClaimsManager(opts Options) IndexClaimsManager {
func NewIndexClaimsManager(opts Options) (IndexClaimsManager, error) {
if atomic.AddUint64(&globalIndexClaimsManagers, 1) != 1 {
err := errMustUseSingleClaimsManager
instrument.EmitAndLogInvariantViolation(opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error(err.Error())
})
return nil, err
}

return &indexClaimsManager{
filePathPrefix: opts.FilePathPrefix(),
nowFn: opts.ClockOptions().NowFn(),
volumeIndexClaims: make(map[string]map[xtime.UnixNano]volumeIndexClaim),
nextIndexFileSetVolumeIndexFn: NextIndexFileSetVolumeIndex,
}
}, nil
}

func (i *indexClaimsManager) ClaimNextIndexFileSetVolumeIndex(
Expand Down
26 changes: 24 additions & 2 deletions src/dbnode/persist/fs/index_claims_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,30 @@ import (
xtime "github.com/m3db/m3/src/x/time"
)

func newTestIndexClaimsManager(t *testing.T, opts Options) IndexClaimsManager {
// Reset the count of index claim managers.
ResetIndexClaimsManagersUnsafe()
mgr, err := NewIndexClaimsManager(opts)
require.NoError(t, err)
return mgr
}

func TestIndexClaimsManagerSingleGlobalManager(t *testing.T) {
// Reset the count of index claim managers.
ResetIndexClaimsManagersUnsafe()

// First should be able to be created easily.
_, err := NewIndexClaimsManager(testDefaultOpts)
require.NoError(t, err)

// Second should cause an error.
_, err = NewIndexClaimsManager(testDefaultOpts)
require.Error(t, err)
require.Equal(t, errMustUseSingleClaimsManager, err)
}

func TestIndexClaimsManagerConcurrentClaims(t *testing.T) {
mgr, ok := NewIndexClaimsManager(NewOptions()).(*indexClaimsManager)
mgr, ok := newTestIndexClaimsManager(t, testDefaultOpts).(*indexClaimsManager)
require.True(t, ok)

// Always return 0 for starting volume index for testing purposes.
Expand Down Expand Up @@ -77,7 +99,7 @@ func TestIndexClaimsManagerConcurrentClaims(t *testing.T) {
// TestIndexClaimsManagerOutOfRetention ensure that we both reject and delete out of
// retention index claims.
func TestIndexClaimsManagerOutOfRetention(t *testing.T) {
mgr, ok := NewIndexClaimsManager(NewOptions()).(*indexClaimsManager)
mgr, ok := newTestIndexClaimsManager(t, testDefaultOpts).(*indexClaimsManager)
require.True(t, ok)

// Always return 0 for starting volume index for testing purposes.
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/persist/fs/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ func TestToVersion1_1Run(t *testing.T) {
pm, err := fs.NewPersistManager(
fsOpts.SetEncodingOptions(msgpack.DefaultLegacyEncodingOptions)) // Set encoder to most up-to-date version
require.NoError(t, err)
icm := fs.NewIndexClaimsManager(fsOpts)
icm, err := fs.NewIndexClaimsManager(fsOpts)
require.NoError(t, err)

md, err := namespace.NewMetadata(nsID, namespace.NewOptions())
require.NoError(t, err)
Expand Down
14 changes: 9 additions & 5 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,12 +614,17 @@ func Run(runOpts RunOptions) {
opts = opts.SetPersistManager(pm)

// Set the index claims manager
icm := fs.NewIndexClaimsManager(fsopts)
icm, err := fs.NewIndexClaimsManager(fsopts)
if err != nil {
logger.Fatal("could not create index claims manager", zap.Error(err))
}
defer func() {
// Reset counter of index claims managers after server teardown.
fs.ResetIndexClaimsManagersUnsafe()
}()
opts = opts.SetIndexClaimsManager(icm)

var (
envCfgResults environment.ConfigureResults
)
var envCfgResults environment.ConfigureResults
if len(envConfig.Statics) == 0 {
logger.Info("creating dynamic config service client with m3cluster")

Expand Down Expand Up @@ -787,7 +792,6 @@ func Run(runOpts RunOptions) {
cfg.Client, iopts, tchannelOpts, syncCfg.TopologyInitializer,
runtimeOptsMgr, origin, protoEnabled, schemaRegistry,
syncCfg.KVStore, logger, runOpts.CustomOptions)

if err != nil {
logger.Fatal("could not create m3db client", zap.Error(err))
}
Expand Down
11 changes: 8 additions & 3 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ func newTestOptions(t require.TestingT, filePathPrefix string) Options {
fsOpts := newTestFsOptions(filePathPrefix)
pm, err := fs.NewPersistManager(fsOpts)
require.NoError(t, err)
icm := fs.NewIndexClaimsManager(fsOpts)

// Allow multiple index claim managers since need to create one
// for each file path prefix (fs options changes between tests).
fs.ResetIndexClaimsManagersUnsafe()

icm, err := fs.NewIndexClaimsManager(fsOpts)
require.NoError(t, err)
return testDefaultOpts.
SetCompactor(compactor).
SetIndexOptions(idxOpts).
Expand Down Expand Up @@ -938,8 +944,7 @@ func TestReadRunMigrations(t *testing.T) {
writeGoodFilesWithFsOpts(t, testNs1ID, testShard, newTestFsOptions(dir).SetEncodingOptions(eOpts))

opts := newTestOptions(t, dir)
icm := fs.NewIndexClaimsManager(opts.FilesystemOptions())
sOpts, closer := newTestStorageOptions(t, opts.PersistManager(), icm)
sOpts, closer := newTestStorageOptions(t, opts.PersistManager(), opts.IndexClaimsManager())
defer closer()

src, err := newFileSystemSource(opts.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func TestNewPeersBootstrapper(t *testing.T) {
require.NoError(t, err)

fsOpts := fs.NewOptions()
icm := fs.NewIndexClaimsManager(fsOpts)
icm, err := fs.NewIndexClaimsManager(fsOpts)
require.NoError(t, err)
opts := NewOptions().
SetFilesystemOptions(fs.NewOptions()).
SetIndexOptions(idxOpts).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,13 @@ func newTestDefaultOpts(t *testing.T, ctrl *gomock.Controller) Options {
})
require.NoError(t, err)
fsOpts := fs.NewOptions()
icm := fs.NewIndexClaimsManager(fsOpts)

// Allow multiple index claim managers since need to create one
// for each file path prefix (fs options changes between tests).
fs.ResetIndexClaimsManagersUnsafe()

icm, err := fs.NewIndexClaimsManager(fsOpts)
require.NoError(t, err)
return NewOptions().
SetResultOptions(testDefaultResultOpts).
SetPersistManager(persist.NewMockManager(ctrl)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ func TestBootstrapIndex(t *testing.T) {
require.NoError(t, err)
opts = opts.SetPersistManager(pm)

icm := fs.NewIndexClaimsManager(opts.FilesystemOptions())
opts = opts.SetIndexClaimsManager(icm)

blockSize := 2 * time.Hour
indexBlockSize := 2 * blockSize

Expand Down
7 changes: 6 additions & 1 deletion src/dbnode/storage/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,13 @@ func DefaultTestOptions() Options {

// Needs a unique index claims manager each time as it tracks volume indices via in mem claims that
// should be different per test.
fs.ResetIndexClaimsManagersUnsafe()
fsOpts := defaultTestOptions.CommitLogOptions().FilesystemOptions()
icm := fs.NewIndexClaimsManager(fsOpts)
icm, err := fs.NewIndexClaimsManager(fsOpts)
if err != nil {
panic(err)
}

return defaultTestOptions.SetIndexClaimsManager(icm)
}

Expand Down

0 comments on commit 3746bf0

Please sign in to comment.