diff --git a/src/dbnode/integration/disk_flush_helpers.go b/src/dbnode/integration/disk_flush_helpers.go index ce954a036c..7d3c87d400 100644 --- a/src/dbnode/integration/disk_flush_helpers.go +++ b/src/dbnode/integration/disk_flush_helpers.go @@ -40,7 +40,6 @@ import ( "github.com/m3db/m3/src/x/ident/testutil" xtime "github.com/m3db/m3/src/x/time" - "github.com/pborman/uuid" "github.com/stretchr/testify/require" ) @@ -85,8 +84,7 @@ func waitUntilSnapshotFilesFlushed( namespace ident.ID, expectedSnapshots []snapshotID, timeout time.Duration, -) (uuid.UUID, error) { - var snapshotID uuid.UUID +) error { dataFlushed := func() bool { for _, shard := range shardSet.AllIDs() { for _, e := range expectedSnapshots { @@ -104,19 +102,14 @@ func waitUntilSnapshotFilesFlushed( if !(latest.ID.VolumeIndex >= e.minVolume) { return false } - - _, snapshotID, err = latest.SnapshotTimeAndID() - if err != nil { - panic(err) - } } } return true } if waitUntil(dataFlushed, timeout) { - return snapshotID, nil + return nil } - return snapshotID, errDiskFlushTimedOut + return errDiskFlushTimedOut } func waitUntilDataFilesFlushed( diff --git a/src/dbnode/integration/disk_snapshot_test.go b/src/dbnode/integration/disk_snapshot_test.go index 5b7d886339..f6aed5eeb4 100644 --- a/src/dbnode/integration/disk_snapshot_test.go +++ b/src/dbnode/integration/disk_snapshot_test.go @@ -152,8 +152,8 @@ func TestDiskSnapshotSimple(t *testing.T) { maxWaitTime := time.Minute for i, ns := range testSetup.Namespaces() { log.Info("waiting for snapshot files to flush") - _, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime) - require.NoError(t, err) + require.NoError(t, waitUntilSnapshotFilesFlushed( + filePathPrefix, shardSet, ns.ID(), snapshotsToWaitForByNS[i], maxWaitTime)) log.Info("verifying snapshot files") verifySnapshottedDataFiles(t, shardSet, testSetup.StorageOpts(), ns.ID(), seriesMaps) } @@ -167,17 +167,15 @@ func TestDiskSnapshotSimple(t *testing.T) { for _, ns := range testSetup.Namespaces() { log.Info("waiting for new snapshot files to be written out") snapshotsToWaitFor := []snapshotID{{blockStart: newTime.Truncate(blockSize)}} - // NB(bodu): We need to check if a specific snapshot ID was deleted since snapshotting logic now changed - // to always snapshotting every block start w/in retention. - snapshotID, err := waitUntilSnapshotFilesFlushed(filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime) - require.NoError(t, err) + require.NoError(t, waitUntilSnapshotFilesFlushed( + filePathPrefix, shardSet, ns.ID(), snapshotsToWaitFor, maxWaitTime)) log.Info("waiting for old snapshot files to be deleted") for _, shard := range shardSet.All() { waitUntil(func() bool { // Increase the time each check to ensure that the filesystem processes are able to progress (some // of them throttle themselves based on time elapsed since the previous time.) testSetup.SetNowFn(testSetup.NowFn()().Add(10 * time.Second)) - exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), snapshotID, shard.ID(), oldTime.Truncate(blockSize)) + exists, err := fs.SnapshotFileSetExistsAt(filePathPrefix, ns.ID(), shard.ID(), oldTime.Truncate(blockSize)) require.NoError(t, err) return !exists }, maxWaitTime) diff --git a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go index 7e209dee63..78341889e9 100644 --- a/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go +++ b/src/dbnode/integration/fs_commitlog_snapshot_mixed_mode_read_write_prop_test.go @@ -29,8 +29,8 @@ import ( "testing" "time" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/x/context" xtime "github.com/m3db/m3/src/x/time" "go.uber.org/zap" @@ -228,7 +228,7 @@ func TestFsCommitLogMixedModeReadWriteProp(t *testing.T) { } else { snapshotBlock = now.Truncate(ns1BlockSize).Add(-ns1BlockSize) } - _, err := waitUntilSnapshotFilesFlushed( + err := waitUntilSnapshotFilesFlushed( filePathPrefix, setup.ShardSet(), nsID, diff --git a/src/dbnode/persist/fs/files.go b/src/dbnode/persist/fs/files.go index 8919d6158a..d09714a562 100644 --- a/src/dbnode/persist/fs/files.go +++ b/src/dbnode/persist/fs/files.go @@ -1441,32 +1441,17 @@ func DataFileSetExists( } // SnapshotFileSetExistsAt determines whether snapshot fileset files exist for the given namespace, shard, and block start time. -func SnapshotFileSetExistsAt( - prefix string, - namespace ident.ID, - snapshotID uuid.UUID, - shard uint32, - blockStart time.Time, -) (bool, error) { +func SnapshotFileSetExistsAt(prefix string, namespace ident.ID, shard uint32, blockStart time.Time) (bool, error) { snapshotFiles, err := SnapshotFiles(prefix, namespace, shard) if err != nil { return false, err } - latest, ok := snapshotFiles.LatestVolumeForBlock(blockStart) + _, ok := snapshotFiles.LatestVolumeForBlock(blockStart) if !ok { return false, nil } - _, latestSnapshotID, err := latest.SnapshotTimeAndID() - if err != nil { - return false, err - } - - if !uuid.Equal(latestSnapshotID, snapshotID) { - return false, nil - } - // LatestVolumeForBlock checks for a complete checkpoint file, so we don't // need to recheck it here. return true, nil diff --git a/src/dbnode/persist/fs/files_test.go b/src/dbnode/persist/fs/files_test.go index bb2cb23dab..c34f162558 100644 --- a/src/dbnode/persist/fs/files_test.go +++ b/src/dbnode/persist/fs/files_test.go @@ -889,7 +889,7 @@ func TestSnapshotFileSetExistsAt(t *testing.T) { writeOutTestSnapshot(t, dir, shard, ts, 0) - exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, testSnapshotID, shard, ts) + exists, err := SnapshotFileSetExistsAt(dir, testNs1ID, shard, ts) require.NoError(t, err) require.True(t, exists) } diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 846791067f..a730d829c4 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -141,7 +141,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // NB(xichen): disable filesystem manager before we bootstrap to minimize // the impact of file operations on bootstrapping performance - m.mediator.DisableFileOpsAndWait() + m.mediator.DisableFileOps() defer m.mediator.EnableFileOps() // Keep performing bootstraps until none pending and no error returned. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go index 3ed656c2b7..be0a2e2a6a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source.go @@ -677,30 +677,6 @@ func (s *commitLogSource) bootstrapShardSnapshots( blockSize time.Duration, mostRecentCompleteSnapshotByBlockShard map[xtime.UnixNano]map[uint32]fs.FileSetFile, ) error { - // NB(bodu): We use info files on disk to check if a snapshot should be loaded in as cold or warm. - // We do this instead of cross refing blockstarts and current time to handle the case of bootstrapping a - // once warm block start after a node has been shut down for a long time. We consider all block starts we - // haven't flushed data for yet a warm block start. - fsOpts := s.opts.CommitLogOptions().FilesystemOptions() - readInfoFilesResults := fs.ReadInfoFiles(fsOpts.FilePathPrefix(), ns.ID(), shard, - fsOpts.InfoReaderBufferSize(), fsOpts.DecodingOptions()) - shardBlockStartsOnDisk := make(map[xtime.UnixNano]struct{}) - for _, result := range readInfoFilesResults { - if err := result.Err.Error(); err != nil { - // If we couldn't read the info files then keep going to be consistent - // with the way the db shard updates its flush states in UpdateFlushStates(). - s.log.Error("unable to read info files in commit log bootstrap", - zap.Uint32("shard", shard), - zap.Stringer("namespace", ns.ID()), - zap.String("filepath", result.Err.Filepath()), - zap.Error(err)) - continue - } - info := result.Info - at := xtime.FromNanoseconds(info.BlockStart) - shardBlockStartsOnDisk[xtime.ToUnixNano(at)] = struct{}{} - } - rangeIter := shardTimeRanges.Iter() for rangeIter.Next() { var ( @@ -733,13 +709,9 @@ func (s *commitLogSource) bootstrapShardSnapshots( continue } - writeType := series.WarmWrite - if _, ok := shardBlockStartsOnDisk[xtime.ToUnixNano(blockStart)]; ok { - writeType = series.ColdWrite - } if err := s.bootstrapShardBlockSnapshot( ns, accumulator, shard, blockStart, blockSize, - mostRecentCompleteSnapshotForShardBlock, writeType); err != nil { + mostRecentCompleteSnapshotForShardBlock); err != nil { return err } } @@ -755,7 +727,6 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( blockStart time.Time, blockSize time.Duration, mostRecentCompleteSnapshot fs.FileSetFile, - writeType series.WriteType, ) error { var ( bOpts = s.opts.ResultOptions() @@ -835,7 +806,7 @@ func (s *commitLogSource) bootstrapShardBlockSnapshot( } // Load into series. - if err := ref.Series.LoadBlock(dbBlock, writeType); err != nil { + if err := ref.Series.LoadBlock(dbBlock, series.WarmWrite); err != nil { return err } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 95337e996f..901ecf29be 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -58,7 +58,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOpsAndWait() + m.EXPECT().DisableFileOps() m.EXPECT().EnableFileOps().AnyTimes() bsm := newBootstrapManager(db, m, opts).(*bootstrapManager) @@ -101,7 +101,7 @@ func TestDatabaseBootstrapSubsequentCallsQueued(t *testing.T) { })) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOpsAndWait() + m.EXPECT().DisableFileOps() m.EXPECT().EnableFileOps().AnyTimes() db := NewMockdatabase(ctrl) @@ -159,7 +159,7 @@ func TestDatabaseBootstrapBootstrapHooks(t *testing.T) { })) m := NewMockdatabaseMediator(ctrl) - m.EXPECT().DisableFileOpsAndWait() + m.EXPECT().DisableFileOps() m.EXPECT().EnableFileOps().AnyTimes() db := NewMockdatabase(ctrl) diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index d91db51329..7bdc74fe26 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -70,15 +70,13 @@ type cleanupManager struct { deleteFilesFn deleteFilesFn deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn - warmFlushCleanupInProgress bool - coldFlushCleanupInProgress bool + cleanupInProgress bool metrics cleanupManagerMetrics logger *zap.Logger } type cleanupManagerMetrics struct { - warmFlushCleanupStatus tally.Gauge - coldFlushCleanupStatus tally.Gauge + status tally.Gauge corruptCommitlogFile tally.Counter corruptSnapshotFile tally.Counter corruptSnapshotMetadataFile tally.Counter @@ -92,8 +90,7 @@ func newCleanupManagerMetrics(scope tally.Scope) cleanupManagerMetrics { sScope := scope.SubScope("snapshot") smScope := scope.SubScope("snapshot-metadata") return cleanupManagerMetrics{ - warmFlushCleanupStatus: scope.Gauge("warm-flush-cleanup"), - coldFlushCleanupStatus: scope.Gauge("cold-flush-cleanup"), + status: scope.Gauge("cleanup"), corruptCommitlogFile: clScope.Counter("corrupt"), corruptSnapshotFile: sScope.Counter("corrupt"), corruptSnapshotMetadataFile: smScope.Counter("corrupt"), @@ -127,7 +124,7 @@ func newCleanupManager( } } -func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { +func (m *cleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { // Don't perform any cleanup if we are not boostrapped yet. if !isBootstrapped { m.logger.Debug("database is still bootstrapping, terminating cleanup") @@ -135,12 +132,12 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro } m.Lock() - m.warmFlushCleanupInProgress = true + m.cleanupInProgress = true m.Unlock() defer func() { m.Lock() - m.warmFlushCleanupInProgress = false + m.cleanupInProgress = false m.Unlock() }() @@ -150,6 +147,11 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro } multiErr := xerrors.NewMultiError() + if err := m.cleanupDataFiles(t, namespaces); err != nil { + multiErr = multiErr.Add(fmt.Errorf( + "encountered errors when cleaning up data files for %v: %v", t, err)) + } + if err := m.cleanupExpiredIndexFiles(t, namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when cleaning up index files for %v: %v", t, err)) @@ -160,6 +162,11 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro "encountered errors when cleaning up index files for %v: %v", t, err)) } + if err := m.deleteInactiveDataFiles(namespaces); err != nil { + multiErr = multiErr.Add(fmt.Errorf( + "encountered errors when deleting inactive data files for %v: %v", t, err)) + } + if err := m.deleteInactiveDataSnapshotFiles(namespaces); err != nil { multiErr = multiErr.Add(fmt.Errorf( "encountered errors when deleting inactive snapshot files for %v: %v", t, err)) @@ -178,57 +185,15 @@ func (m *cleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) erro return multiErr.FinalError() } -func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { - // Don't perform any cleanup if we are not boostrapped yet. - if !isBootstrapped { - m.logger.Debug("database is still bootstrapping, terminating cleanup") - return nil - } - - m.Lock() - m.coldFlushCleanupInProgress = true - m.Unlock() - - defer func() { - m.Lock() - m.coldFlushCleanupInProgress = false - m.Unlock() - }() - - namespaces, err := m.database.OwnedNamespaces() - if err != nil { - return err - } - - multiErr := xerrors.NewMultiError() - if err := m.cleanupDataFiles(t, namespaces); err != nil { - multiErr = multiErr.Add(fmt.Errorf( - "encountered errors when cleaning up data files for %v: %v", t, err)) - } - - if err := m.deleteInactiveDataFiles(namespaces); err != nil { - multiErr = multiErr.Add(fmt.Errorf( - "encountered errors when deleting inactive data files for %v: %v", t, err)) - } - - return multiErr.FinalError() -} func (m *cleanupManager) Report() { m.RLock() - coldFlushCleanupInProgress := m.coldFlushCleanupInProgress - warmFlushCleanupInProgress := m.warmFlushCleanupInProgress + cleanupInProgress := m.cleanupInProgress m.RUnlock() - if coldFlushCleanupInProgress { - m.metrics.coldFlushCleanupStatus.Update(1) - } else { - m.metrics.coldFlushCleanupStatus.Update(0) - } - - if warmFlushCleanupInProgress { - m.metrics.warmFlushCleanupStatus.Update(1) + if cleanupInProgress { + m.metrics.status.Update(1) } else { - m.metrics.warmFlushCleanupStatus.Update(0) + m.metrics.status.Update(0) } } diff --git a/src/dbnode/storage/cleanup_test.go b/src/dbnode/storage/cleanup_test.go index acc15dc251..ca8fe33e72 100644 --- a/src/dbnode/storage/cleanup_test.go +++ b/src/dbnode/storage/cleanup_test.go @@ -32,7 +32,6 @@ import ( "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" - xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" xtest "github.com/m3db/m3/src/x/test" @@ -318,7 +317,7 @@ func TestCleanupManagerCleanupCommitlogsAndSnapshots(t *testing.T) { return nil } - err := cleanup(mgr, ts, true) + err := mgr.Cleanup(ts, true) if tc.expectErr { require.Error(t, err) } else { @@ -361,7 +360,7 @@ func TestCleanupManagerNamespaceCleanupBootstrapped(t *testing.T) { mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) idx.EXPECT().CleanupExpiredFileSets(ts).Return(nil) idx.EXPECT().CleanupDuplicateFileSets().Return(nil) - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, mgr.Cleanup(ts, true)) } func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { @@ -390,7 +389,7 @@ func TestCleanupManagerNamespaceCleanupNotBootstrapped(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(nses, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, cleanup(mgr, ts, false)) + require.NoError(t, mgr.Cleanup(ts, false)) } // Test NS doesn't cleanup when flag is present @@ -423,7 +422,7 @@ func TestCleanupManagerDoesntNeedCleanup(t *testing.T) { return nil } - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, mgr.Cleanup(ts, true)) } func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { @@ -449,7 +448,7 @@ func TestCleanupDataAndSnapshotFileSetFiles(t *testing.T) { db.EXPECT().OwnedNamespaces().Return(namespaces, nil).AnyTimes() mgr := newCleanupManager(db, newNoopFakeActiveLogs(), tally.NoopScope).(*cleanupManager) - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, mgr.Cleanup(ts, true)) } type deleteInactiveDirectoriesCall struct { @@ -488,7 +487,7 @@ func TestDeleteInactiveDataAndSnapshotFileSetFiles(t *testing.T) { } mgr.deleteInactiveDirectoriesFn = deleteInactiveDirectoriesFn - require.NoError(t, cleanup(mgr, ts, true)) + require.NoError(t, mgr.Cleanup(ts, true)) expectedCalls := []deleteInactiveDirectoriesCall{ deleteInactiveDirectoriesCall{ @@ -533,7 +532,7 @@ func TestCleanupManagerPropagatesOwnedNamespacesError(t *testing.T) { require.NoError(t, db.Open()) require.NoError(t, db.Terminate()) - require.Error(t, cleanup(mgr, ts, true)) + require.Error(t, mgr.Cleanup(ts, true)) } func timeFor(s int64) time.Time { @@ -557,14 +556,3 @@ func newFakeActiveLogs(activeLogs persist.CommitLogFiles) fakeActiveLogs { activeLogs: activeLogs, } } - -func cleanup( - mgr databaseCleanupManager, - t time.Time, - isBootstrapped bool, -) error { - multiErr := xerrors.NewMultiError() - multiErr = multiErr.Add(mgr.WarmFlushCleanup(t, isBootstrapped)) - multiErr = multiErr.Add(mgr.ColdFlushCleanup(t, isBootstrapped)) - return multiErr.FinalError() -} diff --git a/src/dbnode/storage/coldflush.go b/src/dbnode/storage/coldflush.go deleted file mode 100644 index 17913c4530..0000000000 --- a/src/dbnode/storage/coldflush.go +++ /dev/null @@ -1,199 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storage - -import ( - "sync" - "time" - - "github.com/m3db/m3/src/dbnode/persist" - xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" - - "github.com/uber-go/tally" - "go.uber.org/zap" -) - -type coldFlushManager struct { - databaseCleanupManager - sync.RWMutex - - log *zap.Logger - database database - pm persist.Manager - opts Options - // Retain using fileOpStatus here to be consistent w/ the - // filesystem manager since both are filesystem processes. - status fileOpStatus - isColdFlushing tally.Gauge - enabled bool -} - -func newColdFlushManager( - database database, - pm persist.Manager, - opts Options, -) databaseColdFlushManager { - instrumentOpts := opts.InstrumentOptions() - scope := instrumentOpts.MetricsScope().SubScope("fs") - // NB(bodu): cold flush cleanup doesn't require commit logs. - cm := newCleanupManager(database, nil, scope) - - return &coldFlushManager{ - databaseCleanupManager: cm, - log: instrumentOpts.Logger(), - database: database, - pm: pm, - opts: opts, - status: fileOpNotStarted, - isColdFlushing: scope.Gauge("cold-flush"), - enabled: true, - } -} - -func (m *coldFlushManager) Disable() fileOpStatus { - m.Lock() - status := m.status - m.enabled = false - m.Unlock() - return status -} - -func (m *coldFlushManager) Enable() fileOpStatus { - m.Lock() - status := m.status - m.enabled = true - m.Unlock() - return status -} - -func (m *coldFlushManager) Status() fileOpStatus { - m.RLock() - status := m.status - m.RUnlock() - return status -} - -func (m *coldFlushManager) Run(t time.Time) bool { - m.Lock() - if !m.shouldRunWithLock() { - m.Unlock() - return false - } - m.status = fileOpInProgress - m.Unlock() - - // NB(xichen): perform data cleanup and flushing sequentially to minimize the impact of disk seeks. - // NB(r): Use invariant here since flush errors were introduced - // and not caught in CI or integration tests. - // When an invariant occurs in CI tests it panics so as to fail - // the build. - if err := m.ColdFlushCleanup(t, m.database.IsBootstrapped()); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("error when cleaning up cold flush data", zap.Time("time", t), zap.Error(err)) - }) - } - if err := m.trackedColdFlush(); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("error when cold flushing data", zap.Time("time", t), zap.Error(err)) - }) - } - m.Lock() - m.status = fileOpNotStarted - m.Unlock() - return true -} - -func (m *coldFlushManager) trackedColdFlush() error { - // The cold flush process will persist any data that has been "loaded" into memory via - // the Load() API but has not yet been persisted durably. As a result, if the cold flush - // process completes without error, then we want to "decrement" the number of tracked bytes - // by however many were outstanding right before the cold flush began. - // - // For example: - // t0: Load 100 bytes --> (numLoadedBytes == 100, numPendingLoadedBytes == 0) - // t1: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 100, numPendingLoadedBytes == 100) - // t2: Load 200 bytes --> (numLoadedBytes == 300, numPendingLoadedBytes == 100) - // t3: ColdFlushStart() - // t4: Load 300 bytes --> (numLoadedBytes == 600, numPendingLoadedBytes == 100) - // t5: ColdFlushEnd() - // t6: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 500, numPendingLoadedBytes == 0) - // t7: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) - // t8: ColdFlushStart() - // t9: ColdFlushError() - // t10: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) - // t11: ColdFlushStart() - // t12: ColdFlushEnd() - // t13: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 0, numPendingLoadedBytes == 0) - memTracker := m.opts.MemoryTracker() - memTracker.MarkLoadedAsPending() - - if err := m.coldFlush(); err != nil { - return err - } - - // Only decrement if the cold flush was a success. In this case, the decrement will reduce the - // value by however many bytes had been tracked when the cold flush began. - memTracker.DecPendingLoadedBytes() - return nil -} - -func (m *coldFlushManager) coldFlush() error { - namespaces, err := m.database.OwnedNamespaces() - if err != nil { - return err - } - - flushPersist, err := m.pm.StartFlushPersist() - if err != nil { - return err - } - - multiErr := xerrors.NewMultiError() - for _, ns := range namespaces { - if err = ns.ColdFlush(flushPersist); err != nil { - multiErr = multiErr.Add(err) - } - } - - multiErr = multiErr.Add(flushPersist.DoneFlush()) - err = multiErr.FinalError() - return err -} - -func (m *coldFlushManager) Report() { - m.databaseCleanupManager.Report() - - m.RLock() - status := m.status - m.RUnlock() - if status == fileOpInProgress { - m.isColdFlushing.Update(1) - } else { - m.isColdFlushing.Update(0) - } -} - -func (m *coldFlushManager) shouldRunWithLock() bool { - return m.enabled && m.status != fileOpInProgress && m.database.IsBootstrapped() -} diff --git a/src/dbnode/storage/coldflush_test.go b/src/dbnode/storage/coldflush_test.go deleted file mode 100644 index 55eb3d355c..0000000000 --- a/src/dbnode/storage/coldflush_test.go +++ /dev/null @@ -1,118 +0,0 @@ -// Copyright (c) 2020 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package storage - -import ( - "errors" - "sync" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/m3db/m3/src/dbnode/persist" - "github.com/stretchr/testify/require" -) - -func TestColdFlushManagerFlushAlreadyInProgress(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var ( - mockPersistManager = persist.NewMockManager(ctrl) - mockFlushPersist = persist.NewMockFlushPreparer(ctrl) - - // Channels used to coordinate cold flushing - startCh = make(chan struct{}, 1) - doneCh = make(chan struct{}, 1) - ) - defer func() { - close(startCh) - close(doneCh) - }() - - mockFlushPersist.EXPECT().DoneFlush().Return(nil) - mockPersistManager.EXPECT().StartFlushPersist().Do(func() { - startCh <- struct{}{} - <-doneCh - }).Return(mockFlushPersist, nil) - - testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) - db := newMockdatabase(ctrl) - db.EXPECT().Options().Return(testOpts).AnyTimes() - db.EXPECT().IsBootstrapped().Return(true).AnyTimes() - db.EXPECT().OwnedNamespaces().Return(nil, nil).AnyTimes() - - cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) - cfm.pm = mockPersistManager - - var ( - wg sync.WaitGroup - now = time.Unix(0, 0) - ) - wg.Add(2) - - // Goroutine 1 should successfully flush. - go func() { - defer wg.Done() - require.True(t, cfm.Run(now)) - }() - - // Goroutine 2 should indicate already flushing. - go func() { - defer wg.Done() - - // Wait until we start the cold flushing process. - <-startCh - - // Ensure it doesn't allow a parallel flush. - require.False(t, cfm.Run(now)) - - // Allow the cold flush to finish. - doneCh <- struct{}{} - }() - - wg.Wait() - -} - -func TestColdFlushManagerFlushDoneFlushError(t *testing.T) { - ctrl := gomock.NewController(t) - defer ctrl.Finish() - - var ( - fakeErr = errors.New("fake error while marking flush done") - mockPersistManager = persist.NewMockManager(ctrl) - mockFlushPersist = persist.NewMockFlushPreparer(ctrl) - ) - - mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) - - testOpts := DefaultTestOptions().SetPersistManager(mockPersistManager) - db := newMockdatabase(ctrl) - db.EXPECT().Options().Return(testOpts).AnyTimes() - db.EXPECT().OwnedNamespaces().Return(nil, nil) - - cfm := newColdFlushManager(db, mockPersistManager, testOpts).(*coldFlushManager) - cfm.pm = mockPersistManager - - require.EqualError(t, fakeErr, cfm.coldFlush().Error()) -} diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index fbef19f04f..653f98d4c7 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -561,9 +561,8 @@ func (d *db) terminateWithLock() error { } func (d *db) Terminate() error { - // NB(bodu): Disable file ops waits for current fs processes to - // finish before disabling. - d.mediator.DisableFileOpsAndWait() + // NB(bodu): Wait for fs processes to finish. + d.mediator.WaitForFileSystemProcesses() d.Lock() defer d.Unlock() @@ -572,9 +571,8 @@ func (d *db) Terminate() error { } func (d *db) Close() error { - // NB(bodu): Disable file ops waits for current fs processes to - // finish before disabling. - d.mediator.DisableFileOpsAndWait() + // NB(bodu): Wait for fs processes to finish. + d.mediator.WaitForFileSystemProcesses() d.Lock() defer d.Unlock() diff --git a/src/dbnode/storage/flush.go b/src/dbnode/storage/flush.go index 73ee135117..f323d7adc9 100644 --- a/src/dbnode/storage/flush.go +++ b/src/dbnode/storage/flush.go @@ -26,7 +26,6 @@ import ( "sync" "time" - "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/retention" @@ -34,7 +33,6 @@ import ( "github.com/pborman/uuid" "github.com/uber-go/tally" - "go.uber.org/zap" ) var ( @@ -49,34 +47,11 @@ const ( // when we haven't begun either a flush or snapshot. flushManagerNotIdle flushManagerFlushInProgress + flushManagerColdFlushInProgress flushManagerSnapshotInProgress flushManagerIndexFlushInProgress ) -type flushManagerMetrics struct { - isFlushing tally.Gauge - isSnapshotting tally.Gauge - isIndexFlushing tally.Gauge - // This is a "debug" metric for making sure that the snapshotting process - // is not overly aggressive. - maxBlocksSnapshottedByNamespace tally.Gauge - dataWarmFlushDuration tally.Timer - dataSnapshotDuration tally.Timer - indexFlushDuration tally.Timer -} - -func newFlushManagerMetrics(scope tally.Scope) flushManagerMetrics { - return flushManagerMetrics{ - isFlushing: scope.Gauge("flush"), - isSnapshotting: scope.Gauge("snapshot"), - isIndexFlushing: scope.Gauge("index-flush"), - maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), - dataWarmFlushDuration: scope.Timer("data-warm-flush-duration"), - dataSnapshotDuration: scope.Timer("data-snapshot-duration"), - indexFlushDuration: scope.Timer("index-flush-duration"), - } -} - type flushManager struct { sync.RWMutex @@ -87,12 +62,16 @@ type flushManager struct { // state is used to protect the flush manager against concurrent use, // while flushInProgress and snapshotInProgress are more granular and // are used for emitting granular gauges. - state flushManagerState - metrics flushManagerMetrics + state flushManagerState + isFlushing tally.Gauge + isColdFlushing tally.Gauge + isSnapshotting tally.Gauge + isIndexFlushing tally.Gauge + // This is a "debug" metric for making sure that the snapshotting process + // is not overly aggressive. + maxBlocksSnapshottedByNamespace tally.Gauge lastSuccessfulSnapshotStartTime time.Time - logger *zap.Logger - nowFn clock.NowFn } func newFlushManager( @@ -102,13 +81,15 @@ func newFlushManager( ) databaseFlushManager { opts := database.Options() return &flushManager{ - database: database, - commitlog: commitlog, - opts: opts, - pm: opts.PersistManager(), - metrics: newFlushManagerMetrics(scope), - logger: opts.InstrumentOptions().Logger(), - nowFn: opts.ClockOptions().NowFn(), + database: database, + commitlog: commitlog, + opts: opts, + pm: opts.PersistManager(), + isFlushing: scope.Gauge("flush"), + isColdFlushing: scope.Gauge("cold-flush"), + isSnapshotting: scope.Gauge("snapshot"), + isIndexFlushing: scope.Gauge("index-flush"), + maxBlocksSnapshottedByNamespace: scope.Gauge("max-blocks-snapshotted-by-namespace"), } } @@ -129,16 +110,16 @@ func (m *flushManager) Flush(startTime time.Time) error { return err } - // Perform two separate loops through all the namespaces so that we can + // Perform three separate loops through all the namespaces so that we can // emit better gauges, i.e. all the flushing for all the namespaces happens - // at once then all the snapshotting. This is + // at once, then all the cold flushes, then all the snapshotting. This is // also slightly better semantically because flushing should take priority - // over snapshotting. + // over cold flushes and snapshotting. // // In addition, we need to make sure that for any given shard/blockStart - // combination, we attempt a flush before a snapshot as the snapshotting process - // will attempt to snapshot blocks w/ unflushed data which would be wasteful if - // the block is already flushable. + // combination, we attempt a flush and then a cold flush before a snapshot + // as the snapshotting process will attempt to snapshot any unflushed blocks + // which would be wasteful if the block is already flushable. multiErr := xerrors.NewMultiError() if err = m.dataWarmFlush(namespaces, startTime); err != nil { multiErr = multiErr.Add(err) @@ -146,6 +127,44 @@ func (m *flushManager) Flush(startTime time.Time) error { rotatedCommitlogID, err := m.commitlog.RotateLogs() if err == nil { + // The cold flush process will persist any data that has been "loaded" into memory via + // the Load() API but has not yet been persisted durably. As a result, if the cold flush + // process completes without error, then we want to "decrement" the number of tracked bytes + // by however many were outstanding right before the cold flush began. + // + // For example: + // t0: Load 100 bytes --> (numLoadedBytes == 100, numPendingLoadedBytes == 0) + // t1: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 100, numPendingLoadedBytes == 100) + // t2: Load 200 bytes --> (numLoadedBytes == 300, numPendingLoadedBytes == 100) + // t3: ColdFlushStart() + // t4: Load 300 bytes --> (numLoadedBytes == 600, numPendingLoadedBytes == 100) + // t5: ColdFlushEnd() + // t6: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 500, numPendingLoadedBytes == 0) + // t7: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) + // t8: ColdFlushStart() + // t9: ColdFlushError() + // t10: memTracker.MarkLoadedAsPending() --> (numLoadedBytes == 500, numPendingLoadedBytes == 500) + // t11: ColdFlushStart() + // t12: ColdFlushEnd() + // t13: memTracker.DecPendingLoadedBytes() --> (numLoadedBytes == 0, numPendingLoadedBytes == 0) + memTracker := m.opts.MemoryTracker() + memTracker.MarkLoadedAsPending() + if err = m.dataColdFlush(namespaces); err != nil { + multiErr = multiErr.Add(err) + // If cold flush fails, we can't proceed to snapshotting because + // commit log cleanup logic uses the presence of a successful + // snapshot checkpoint file to determine which commit log files are + // safe to delete. Therefore if a cold flush fails and a snapshot + // succeeds, the writes from the failed cold flush might be lost + // when commit logs get cleaned up, leaving the node in an undurable + // state such that if it restarted, it would not be able to recover + // the cold writes from its commit log. + return multiErr.FinalError() + } + // Only decrement if the cold flush was a success. In this case, the decrement will reduce the + // value by however many bytes had been tracked when the cold flush began. + memTracker.DecPendingLoadedBytes() + if err = m.dataSnapshot(namespaces, startTime, rotatedCommitlogID); err != nil { multiErr = multiErr.Add(err) } @@ -170,10 +189,7 @@ func (m *flushManager) dataWarmFlush( } m.setState(flushManagerFlushInProgress) - var ( - start = m.nowFn() - multiErr = xerrors.NewMultiError() - ) + multiErr := xerrors.NewMultiError() for _, ns := range namespaces { // Flush first because we will only snapshot if there are no outstanding flushes. flushTimes, err := m.namespaceFlushTimes(ns, startTime) @@ -192,7 +208,30 @@ func (m *flushManager) dataWarmFlush( multiErr = multiErr.Add(err) } - m.metrics.dataWarmFlushDuration.Record(m.nowFn().Sub(start)) + return multiErr.FinalError() +} + +func (m *flushManager) dataColdFlush( + namespaces []databaseNamespace, +) error { + flushPersist, err := m.pm.StartFlushPersist() + if err != nil { + return err + } + + m.setState(flushManagerColdFlushInProgress) + multiErr := xerrors.NewMultiError() + for _, ns := range namespaces { + if err = ns.ColdFlush(flushPersist); err != nil { + multiErr = multiErr.Add(err) + } + } + + err = flushPersist.DoneFlush() + if err != nil { + multiErr = multiErr.Add(err) + } + return multiErr.FinalError() } @@ -210,7 +249,6 @@ func (m *flushManager) dataSnapshot( m.setState(flushManagerSnapshotInProgress) var ( - start = m.nowFn() maxBlocksSnapshottedByNamespace = 0 multiErr = xerrors.NewMultiError() ) @@ -240,7 +278,7 @@ func (m *flushManager) dataSnapshot( } } } - m.metrics.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) + m.maxBlocksSnapshottedByNamespace.Update(float64(maxBlocksSnapshottedByNamespace)) err = snapshotPersist.DoneSnapshot(snapshotID, rotatedCommitlogID) multiErr = multiErr.Add(err) @@ -249,7 +287,6 @@ func (m *flushManager) dataSnapshot( if finalErr == nil { m.lastSuccessfulSnapshotStartTime = startTime } - m.metrics.dataSnapshotDuration.Record(m.nowFn().Sub(start)) return finalErr } @@ -262,10 +299,7 @@ func (m *flushManager) indexFlush( } m.setState(flushManagerIndexFlushInProgress) - var ( - start = m.nowFn() - multiErr = xerrors.NewMultiError() - ) + multiErr := xerrors.NewMultiError() for _, ns := range namespaces { var ( indexOpts = ns.Options().IndexOptions() @@ -278,7 +312,6 @@ func (m *flushManager) indexFlush( } multiErr = multiErr.Add(indexFlush.DoneIndex()) - m.metrics.indexFlushDuration.Record(m.nowFn().Sub(start)) return multiErr.FinalError() } @@ -288,21 +321,27 @@ func (m *flushManager) Report() { m.RUnlock() if state == flushManagerFlushInProgress { - m.metrics.isFlushing.Update(1) + m.isFlushing.Update(1) + } else { + m.isFlushing.Update(0) + } + + if state == flushManagerColdFlushInProgress { + m.isColdFlushing.Update(1) } else { - m.metrics.isFlushing.Update(0) + m.isColdFlushing.Update(0) } if state == flushManagerSnapshotInProgress { - m.metrics.isSnapshotting.Update(1) + m.isSnapshotting.Update(1) } else { - m.metrics.isSnapshotting.Update(0) + m.isSnapshotting.Update(0) } if state == flushManagerIndexFlushInProgress { - m.metrics.isIndexFlushing.Update(1) + m.isIndexFlushing.Update(1) } else { - m.metrics.isIndexFlushing.Update(0) + m.isIndexFlushing.Update(0) } } @@ -353,8 +392,13 @@ func (m *flushManager) namespaceSnapshotTimes(ns databaseNamespace, curr time.Ti candidateTimes := timesInRange(earliest, latest, blockSize) var loopErr error return filterTimes(candidateTimes, func(t time.Time) bool { - // NB(bodu): Snapshot everything since to account for cold writes/blocks. - return true + // Snapshot anything that is unflushed. + needsFlush, err := ns.NeedsFlush(t, t) + if err != nil { + loopErr = err + return false + } + return needsFlush }), loopErr } diff --git a/src/dbnode/storage/flush_test.go b/src/dbnode/storage/flush_test.go index 1e271a7b3f..c909229839 100644 --- a/src/dbnode/storage/flush_test.go +++ b/src/dbnode/storage/flush_test.go @@ -141,12 +141,17 @@ func TestFlushManagerFlushAlreadyInProgress(t *testing.T) { // Allow the flush to finish. doneCh <- struct{}{} - // Allow the snapshot to begin and finish. + // Wait until we start the compaction process. <-startCh // Ensure it doesn't allow a parallel flush. require.Equal(t, errFlushOperationsInProgress, fm.Flush(now)) + // Allow the compaction to finish. + doneCh <- struct{}{} + + // Allow the snapshot to begin and finish. + <-startCh doneCh <- struct{}{} }() @@ -166,8 +171,11 @@ func TestFlushManagerFlushDoneFlushError(t *testing.T) { mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + gomock.InOrder( + mockFlushPersist.EXPECT().DoneFlush().Return(fakeErr), + mockFlushPersist.EXPECT().DoneFlush().Return(nil), + ) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -205,8 +213,8 @@ func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) { ) // Make sure DoneFlush is called despite encountering an error, once for snapshot and once for warm flush. - mockFlushPersist.EXPECT().DoneFlush().Return(nil) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -224,6 +232,7 @@ func TestFlushManagerNamespaceFlushTimesErr(t *testing.T) { ns.EXPECT().Options().Return(nsOpts).AnyTimes() ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(false, fakeErr).AnyTimes() + ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() db.EXPECT().OwnedNamespaces().Return([]databaseNamespace{ns}, nil) @@ -250,8 +259,8 @@ func TestFlushManagerFlushDoneSnapshotError(t *testing.T) { mockSnapshotPersist = persist.NewMockSnapshotPreparer(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(fakeErr) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -285,8 +294,8 @@ func TestFlushManagerFlushDoneIndexError(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -321,6 +330,7 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() var ( @@ -329,8 +339,8 @@ func TestFlushManagerSkipNamespaceIndexingDisabled(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -364,6 +374,7 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { ns.EXPECT().ID().Return(defaultTestNs1ID).AnyTimes() ns.EXPECT().NeedsFlush(gomock.Any(), gomock.Any()).Return(true, nil).AnyTimes() ns.EXPECT().WarmFlush(gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + ns.EXPECT().ColdFlush(gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().Snapshot(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() ns.EXPECT().FlushIndex(gomock.Any()).Return(nil) @@ -373,8 +384,8 @@ func TestFlushManagerNamespaceIndexingEnabled(t *testing.T) { mockPersistManager = persist.NewMockManager(ctrl) ) - mockFlushPersist.EXPECT().DoneFlush().Return(nil) - mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil) + mockFlushPersist.EXPECT().DoneFlush().Return(nil).Times(2) + mockPersistManager.EXPECT().StartFlushPersist().Return(mockFlushPersist, nil).Times(2) mockSnapshotPersist.EXPECT().DoneSnapshot(gomock.Any(), testCommitlogFile).Return(nil) mockPersistManager.EXPECT().StartSnapshotPersist(gomock.Any()).Return(mockSnapshotPersist, nil) @@ -542,10 +553,13 @@ func TestFlushManagerFlushSnapshot(t *testing.T) { ns.EXPECT().NeedsFlush(st, st).Return(false, nil) } + ns.EXPECT().ColdFlush(gomock.Any()) + snapshotEnd := now.Add(bufferFuture).Truncate(blockSize) num = numIntervals(start, snapshotEnd, blockSize) for i := 0; i < num; i++ { st := start.Add(time.Duration(i) * blockSize) + ns.EXPECT().NeedsFlush(st, st).Return(true, nil) ns.EXPECT().Snapshot(st, now, gomock.Any()) } } diff --git a/src/dbnode/storage/fs.go b/src/dbnode/storage/fs.go index 0fd7509982..44cd043c4a 100644 --- a/src/dbnode/storage/fs.go +++ b/src/dbnode/storage/fs.go @@ -158,7 +158,7 @@ func (m *fileSystemManager) Run( // and not caught in CI or integration tests. // When an invariant occurs in CI tests it panics so as to fail // the build. - if err := m.WarmFlushCleanup(t, m.database.IsBootstrapped()); err != nil { + if err := m.Cleanup(t, m.database.IsBootstrapped()); err != nil { instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), func(l *zap.Logger) { l.Error("error when cleaning up data", zap.Time("time", t), zap.Error(err)) diff --git a/src/dbnode/storage/fs_test.go b/src/dbnode/storage/fs_test.go index 17bff01cc6..4832808737 100644 --- a/src/dbnode/storage/fs_test.go +++ b/src/dbnode/storage/fs_test.go @@ -85,7 +85,7 @@ func TestFileSystemManagerRun(t *testing.T) { ts := time.Now() gomock.InOrder( - cm.EXPECT().WarmFlushCleanup(ts, true).Return(errors.New("foo")), + cm.EXPECT().Cleanup(ts, true).Return(errors.New("foo")), fm.EXPECT().Flush(ts).Return(errors.New("bar")), ) diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index 74bd0934dc..26b27f151c 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -26,7 +26,6 @@ import ( "time" "github.com/m3db/m3/src/dbnode/clock" - "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/x/instrument" @@ -35,7 +34,8 @@ import ( ) type ( - mediatorState int + mediatorState int + fileSystemProcessesState int ) const ( @@ -46,6 +46,9 @@ const ( mediatorNotOpen mediatorState = iota mediatorOpen mediatorClosed + + fileSystemProcessesIdle fileSystemProcessesState = iota + fileSystemProcessesBusy ) var ( @@ -77,17 +80,17 @@ type mediator struct { database database databaseBootstrapManager databaseFileSystemManager - databaseColdFlushManager databaseTickManager databaseRepairer - opts Options - nowFn clock.NowFn - sleepFn sleepFn - metrics mediatorMetrics - state mediatorState - mediatorTimeBarrier mediatorTimeBarrier - closedCh chan struct{} + opts Options + nowFn clock.NowFn + sleepFn sleepFn + metrics mediatorMetrics + state mediatorState + fileSystemProcessesState fileSystemProcessesState + mediatorTimeBarrier mediatorTimeBarrier + closedCh chan struct{} } // TODO(r): Consider renaming "databaseMediator" to "databaseCoordinator" @@ -99,31 +102,23 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options) nowFn = opts.ClockOptions().NowFn() ) d := &mediator{ - database: database, - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - metrics: newMediatorMetrics(scope), - state: mediatorNotOpen, - mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), - closedCh: make(chan struct{}), + database: database, + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + metrics: newMediatorMetrics(scope), + state: mediatorNotOpen, + fileSystemProcessesState: fileSystemProcessesIdle, + mediatorTimeBarrier: newMediatorTimeBarrier(nowFn, iOpts), + closedCh: make(chan struct{}), } fsm := newFileSystemManager(database, commitlog, opts) d.databaseFileSystemManager = fsm - // NB(bodu): Cold flush needs its own persist manager now - // that its running in its own thread. - fsOpts := opts.CommitLogOptions().FilesystemOptions() - pm, err := fs.NewPersistManager(fsOpts) - if err != nil { - return nil, err - } - cfm := newColdFlushManager(database, pm, opts) - d.databaseColdFlushManager = cfm - d.databaseRepairer = newNoopDatabaseRepairer() if opts.RepairEnabled() { + var err error d.databaseRepairer, err = newDatabaseRepairer(database, opts) if err != nil { return nil, err @@ -144,39 +139,39 @@ func (m *mediator) Open() error { m.state = mediatorOpen go m.reportLoop() go m.ongoingFileSystemProcesses() - go m.ongoingColdFlushProcesses() go m.ongoingTick() m.databaseRepairer.Start() return nil } -func (m *mediator) DisableFileOpsAndWait() { +func (m *mediator) DisableFileOps() { status := m.databaseFileSystemManager.Disable() for status == fileOpInProgress { m.sleepFn(fileOpCheckInterval) status = m.databaseFileSystemManager.Status() } - // Even though the cold flush runs separately, its still - // considered a fs process. - status = m.databaseColdFlushManager.Disable() - for status == fileOpInProgress { - m.sleepFn(fileOpCheckInterval) - status = m.databaseColdFlushManager.Status() - } } func (m *mediator) EnableFileOps() { m.databaseFileSystemManager.Enable() - // Even though the cold flush runs separately, its still - // considered a fs process. - m.databaseColdFlushManager.Enable() } func (m *mediator) Report() { m.databaseBootstrapManager.Report() m.databaseRepairer.Report() m.databaseFileSystemManager.Report() - m.databaseColdFlushManager.Report() +} + +func (m *mediator) WaitForFileSystemProcesses() { + m.RLock() + fileSystemProcessesState := m.fileSystemProcessesState + m.RUnlock() + for fileSystemProcessesState == fileSystemProcessesBusy { + m.sleepFn(fileSystemProcessesCheckInterval) + m.RLock() + fileSystemProcessesState = m.fileSystemProcessesState + m.RUnlock() + } } func (m *mediator) Close() error { @@ -194,7 +189,7 @@ func (m *mediator) Close() error { return nil } -// The mediator mediates the relationship between ticks and warm flushes/snapshots. +// The mediator mediates the relationship between ticks and flushes(warm and cold)/snapshots/cleanups. // // For example, the requirements to perform a flush are: // 1) currentTime > blockStart.Add(blockSize).Add(bufferPast) @@ -226,27 +221,6 @@ func (m *mediator) ongoingFileSystemProcesses() { } } -// The mediator mediates the relationship between ticks and cold flushes/cleanup the same way it does for warm flushes/snapshots. -// We want to begin each cold/warm flush with an in sync view of time as a tick. -// NB(bodu): Cold flushes and cleanup have been separated out into it's own thread to avoid blocking snapshots. -func (m *mediator) ongoingColdFlushProcesses() { - for { - select { - case <-m.closedCh: - return - default: - m.sleepFn(tickCheckInterval) - - // Check if the mediator is already closed. - if !m.isOpen() { - return - } - - m.runColdFlushProcesses() - } - } -} - func (m *mediator) ongoingTick() { var ( log = m.opts.InstrumentOptions().Logger() @@ -282,6 +256,15 @@ func (m *mediator) ongoingTick() { } func (m *mediator) runFileSystemProcesses() { + m.Lock() + m.fileSystemProcessesState = fileSystemProcessesBusy + m.Unlock() + defer func() { + m.Lock() + m.fileSystemProcessesState = fileSystemProcessesIdle + m.Unlock() + }() + // See comment over mediatorTimeBarrier for an explanation of this logic. log := m.opts.InstrumentOptions().Logger() mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() @@ -293,18 +276,6 @@ func (m *mediator) runFileSystemProcesses() { m.databaseFileSystemManager.Run(mediatorTime, syncRun, noForce) } -func (m *mediator) runColdFlushProcesses() { - // See comment over mediatorTimeBarrier for an explanation of this logic. - log := m.opts.InstrumentOptions().Logger() - mediatorTime, err := m.mediatorTimeBarrier.fsProcessesWait() - if err != nil { - log.Error("error within ongoingColdFlushProcesses waiting for next mediatorTime", zap.Error(err)) - return - } - - m.databaseColdFlushManager.Run(mediatorTime) -} - func (m *mediator) reportLoop() { interval := m.opts.InstrumentOptions().ReportInterval() t := time.NewTicker(interval) @@ -343,65 +314,42 @@ func (m *mediator) isOpen() bool { // This means that once a run of filesystem processes completes it will always have to wait until the currently // executing tick completes before performing the next run, but in practice this should not be much of an issue. // -// Additionally, an independent cold flush process complicates this a bit more in that we have more than one filesystem -// process waiting on the mediator barrier. The invariant here is that both warm and cold flushes always start on a tick -// with a consistent view of time as the tick it is on. They don't necessarily need to start on the same tick. See the -// diagram below for an example case. -// -// ____________ ___________ _________________ -// | Flush (t0) | | Tick (t0) | | Cold Flush (t0) | -// | | | | | | -// | | |___________| | | -// | | ___________ | | -// | | | Tick (t0) | | | -// | | | | | | -// | | |___________| | | -// | | ___________ | | -// |____________| | Tick (t0) | | | -// barrier.wait() | | | | -// |___________| | | -// mediatorTime = t1 | | -// barrier.release() | | -// ____________ ___________ | | -// | Flush (t1) | | Tick (t1) | |_________________| -// | | | | barrier.wait() +// ____________ ___________ +// | Flush (t0) | | Tick (t0) | +// | | | | // | | |___________| -// | | mediatorTime = t2 -// | | barrier.release() -// | | ___________ _________________ -// | | | Tick (t2) | | Cold Flush (t2) | -// |____________| | | | | -// barrier.wait() |___________| | | -// mediatorTime = t3 | | -// barrier.release() | | -// ____________ ___________ | | -// | Flush (t3) | | Tick (t3) | | | -// | | | | | | -// | | |___________| | | -// | | ___________ | | -// | | | Tick (t3) | | | -// | | | | | | -// | | |___________| | | -// | | ___________ | | -// |____________| | Tick (t3) | |_________________| -// barrier.wait() | | barrier.wait() -// |___________| -// mediatorTime = t4 -// barrier.release() -// ____________ ___________ _________________ -// | Flush (t4) | | Tick (t4) | | Cold Flush (t4) | -// | | | | | | -// ------------------------------------------------------------ +// | | ___________ +// | | | Tick (t0) | +// | | | | +// | | |___________| +// | | ___________ +// |____________| | Tick (t0) | +// barrier.wait() | | +// |___________| +// mediatorTime = t1 +// barrier.release() +// ------------------------------------- +// ____________ ___________ +// | Flush (t1) | | Tick (t1) | +// | | | | +// | | |___________| +// | | ___________ +// | | | Tick (t1) | +// | | | | +// | | |___________| +// | | ___________ +// |____________| | Tick (t1) | +// barrier.wait() | | +// |___________| +// barrier.release() +// ------------------------------------ type mediatorTimeBarrier struct { sync.Mutex - // Both mediatorTime and numFsProcessesWaiting are protected - // by the mutex. - mediatorTime time.Time - numFsProcessesWaiting int - - nowFn func() time.Time - iOpts instrument.Options - releaseCh chan time.Time + mediatorTime time.Time + nowFn func() time.Time + iOpts instrument.Options + fsProcessesWaiting bool + releaseCh chan time.Time } // initialMediatorTime should only be used to obtain the initial time for @@ -415,24 +363,28 @@ func (b *mediatorTimeBarrier) initialMediatorTime() time.Time { func (b *mediatorTimeBarrier) fsProcessesWait() (time.Time, error) { b.Lock() - b.numFsProcessesWaiting++ + if b.fsProcessesWaiting { + b.Unlock() + return time.Time{}, errMediatorTimeBarrierAlreadyWaiting + } + b.fsProcessesWaiting = true b.Unlock() t := <-b.releaseCh b.Lock() - b.numFsProcessesWaiting-- + b.fsProcessesWaiting = false b.Unlock() return t, nil } func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { b.Lock() - numWaiters := b.numFsProcessesWaiting + hasWaiter := b.fsProcessesWaiting mediatorTime := b.mediatorTime b.Unlock() - if numWaiters == 0 { + if !hasWaiter { // If there isn't a waiter yet then the filesystem processes may still // be ongoing in which case we don't want to release the barrier / update // the current time yet. Allow the tick to run again with the same time @@ -453,9 +405,7 @@ func (b *mediatorTimeBarrier) maybeRelease() (time.Time, error) { } b.mediatorTime = newMediatorTime - for i := 0; i < numWaiters; i++ { - b.releaseCh <- b.mediatorTime - } + b.releaseCh <- b.mediatorTime return b.mediatorTime, nil } diff --git a/src/dbnode/storage/mediator_test.go b/src/dbnode/storage/mediator_test.go index 14056d4fca..76a7e4f78d 100644 --- a/src/dbnode/storage/mediator_test.go +++ b/src/dbnode/storage/mediator_test.go @@ -56,7 +56,7 @@ func TestDatabaseMediatorOpenClose(t *testing.T) { require.Equal(t, errMediatorAlreadyClosed, m.Close()) } -func TestDatabaseMediatorDisableFileOpsAndWait(t *testing.T) { +func TestDatabaseMediatorDisableFileOps(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -86,6 +86,6 @@ func TestDatabaseMediatorDisableFileOpsAndWait(t *testing.T) { fsm.EXPECT().Status().Return(fileOpNotStarted), ) - m.DisableFileOpsAndWait() + m.DisableFileOps() require.Equal(t, 3, len(slept)) } diff --git a/src/dbnode/storage/series/buffer.go b/src/dbnode/storage/series/buffer.go index 04f261d399..af8c742fcf 100644 --- a/src/dbnode/storage/series/buffer.go +++ b/src/dbnode/storage/series/buffer.go @@ -133,8 +133,6 @@ type databaseBuffer interface { IsEmpty() bool - IsEmptyAtBlockStart(time.Time) bool - ColdFlushBlockStarts(blockStates map[xtime.UnixNano]BlockState) OptimizedTimes Stats() bufferStats @@ -419,14 +417,6 @@ func (b *dbBuffer) IsEmpty() bool { return len(b.bucketsMap) == 0 } -func (b *dbBuffer) IsEmptyAtBlockStart(start time.Time) bool { - bv, exists := b.bucketVersionsAt(start) - if !exists { - return true - } - return bv.streamsLen() == 0 -} - func (b *dbBuffer) ColdFlushBlockStarts(blockStates map[xtime.UnixNano]BlockState) OptimizedTimes { var times OptimizedTimes diff --git a/src/dbnode/storage/series/buffer_mock.go b/src/dbnode/storage/series/buffer_mock.go index f242c84f93..a215f611f4 100644 --- a/src/dbnode/storage/series/buffer_mock.go +++ b/src/dbnode/storage/series/buffer_mock.go @@ -194,20 +194,6 @@ func (mr *MockdatabaseBufferMockRecorder) IsEmpty() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEmpty", reflect.TypeOf((*MockdatabaseBuffer)(nil).IsEmpty)) } -// IsEmptyAtBlockStart mocks base method -func (m *MockdatabaseBuffer) IsEmptyAtBlockStart(arg0 time.Time) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsEmptyAtBlockStart", arg0) - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsEmptyAtBlockStart indicates an expected call of IsEmptyAtBlockStart -func (mr *MockdatabaseBufferMockRecorder) IsEmptyAtBlockStart(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsEmptyAtBlockStart", reflect.TypeOf((*MockdatabaseBuffer)(nil).IsEmptyAtBlockStart), arg0) -} - // ColdFlushBlockStarts mocks base method func (m *MockdatabaseBuffer) ColdFlushBlockStarts(blockStates map[time0.UnixNano]BlockState) OptimizedTimes { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 23b8ecaa0a..4e5931ca67 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -297,13 +297,6 @@ func (s *dbSeries) IsEmpty() bool { return false } -func (s *dbSeries) IsBufferEmptyAtBlockStart(blockStart time.Time) bool { - s.RLock() - bufferEmpty := s.buffer.IsEmptyAtBlockStart(blockStart) - s.RUnlock() - return bufferEmpty -} - func (s *dbSeries) NumActiveBlocks() int { s.RLock() value := s.cachedBlocks.Len() + s.buffer.Stats().wiredBlocks diff --git a/src/dbnode/storage/series/series_mock.go b/src/dbnode/storage/series/series_mock.go index 9ffca2a59d..c6fc05e145 100644 --- a/src/dbnode/storage/series/series_mock.go +++ b/src/dbnode/storage/series/series_mock.go @@ -163,20 +163,6 @@ func (mr *MockDatabaseSeriesMockRecorder) ID() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ID", reflect.TypeOf((*MockDatabaseSeries)(nil).ID)) } -// IsBufferEmptyAtBlockStart mocks base method -func (m *MockDatabaseSeries) IsBufferEmptyAtBlockStart(arg0 time.Time) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsBufferEmptyAtBlockStart", arg0) - ret0, _ := ret[0].(bool) - return ret0 -} - -// IsBufferEmptyAtBlockStart indicates an expected call of IsBufferEmptyAtBlockStart -func (mr *MockDatabaseSeriesMockRecorder) IsBufferEmptyAtBlockStart(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsBufferEmptyAtBlockStart", reflect.TypeOf((*MockDatabaseSeries)(nil).IsBufferEmptyAtBlockStart), arg0) -} - // IsEmpty mocks base method func (m *MockDatabaseSeries) IsEmpty() bool { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index d279979f8e..0d5f627183 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -109,13 +109,9 @@ type DatabaseSeries interface { opts FetchBlocksMetadataOptions, ) (block.FetchBlocksMetadataResult, error) - // IsEmpty returns whether series is empty (includes both cached blocks and in-mem buffer data). + // IsEmpty returns whether series is empty. IsEmpty() bool - // IsBufferEmptyAtBlockStart returns whether the series buffer is empty at block start - // (only checks for in-mem buffer data). - IsBufferEmptyAtBlockStart(time.Time) bool - // NumActiveBlocks returns the number of active blocks the series currently holds. NumActiveBlocks() int diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index e7f6a70982..ab8dd5dfb2 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -251,16 +251,11 @@ type shardFlushState struct { sync.RWMutex statesByTime map[xtime.UnixNano]fileOpState initialized bool - - // NB(bodu): Cache state on whether we snapshotted last or not to avoid - // going to disk to see if filesets are empty. - emptySnapshotOnDiskByTime map[xtime.UnixNano]bool } func newShardFlushState() shardFlushState { return shardFlushState{ - statesByTime: make(map[xtime.UnixNano]fileOpState), - emptySnapshotOnDiskByTime: make(map[xtime.UnixNano]bool), + statesByTime: make(map[xtime.UnixNano]fileOpState), } } @@ -2375,29 +2370,8 @@ func (s *dbShard) Snapshot( s.RUnlock() return errShardNotBootstrappedToSnapshot } - s.RUnlock() - var needsSnapshot bool - s.forEachShardEntry(func(entry *lookup.Entry) bool { - if !entry.Series.IsBufferEmptyAtBlockStart(blockStart) { - needsSnapshot = true - return false - } - return true - }) - // Only terminate early when we would be over-writing an empty snapshot fileset on disk. - // TODO(bodu): We could bootstrap empty snapshot state in the bs path to avoid doing extra - // snapshotting work after a bootstrap since this cached state gets cleared. - s.flushState.RLock() - // NB(bodu): This always defaults to false if the record does not exist. - emptySnapshotOnDisk := s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] - s.flushState.RUnlock() - - if !needsSnapshot && emptySnapshotOnDisk { - return nil - } - var multiErr xerrors.MultiError prepareOpts := persist.DataPrepareOptions{ @@ -2444,24 +2418,7 @@ func (s *dbShard) Snapshot( multiErr = multiErr.Add(err) } - if err := multiErr.FinalError(); err != nil { - return err - } - - // Only update cached snapshot state if we successfully flushed data to disk. - s.flushState.Lock() - if needsSnapshot { - s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = false - } else { - // NB(bodu): If we flushed an empty snapshot to disk, it means that the previous - // snapshot on disk was not empty (or we just bootstrapped and cached state was lost). - // The snapshot we just flushed may or may not have data, although whatever data we flushed - // would be recoverable from the rotate commit log as well. - s.flushState.emptySnapshotOnDiskByTime[xtime.ToUnixNano(blockStart)] = true - } - s.flushState.Unlock() - - return nil + return multiErr.FinalError() } func (s *dbShard) FlushState(blockStart time.Time) (fileOpState, error) { diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 0ea3546cd8..55e0bb46c6 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -829,7 +829,6 @@ func TestShardSnapshotSeriesSnapshotSuccess(t *testing.T) { series := series.NewMockDatabaseSeries(ctrl) series.EXPECT().ID().Return(ident.StringID("foo" + strconv.Itoa(i))).AnyTimes() series.EXPECT().IsEmpty().Return(false).AnyTimes() - series.EXPECT().IsBufferEmptyAtBlockStart(blockStart).Return(false).AnyTimes() series.EXPECT(). Snapshot(gomock.Any(), blockStart, gomock.Any(), gomock.Any()). Do(func(context.Context, time.Time, persist.DataFn, namespace.Context) { diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index 85a40a0cd5..864655e539 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2542,32 +2542,18 @@ func (m *MockdatabaseCleanupManager) EXPECT() *MockdatabaseCleanupManagerMockRec return m.recorder } -// WarmFlushCleanup mocks base method -func (m *MockdatabaseCleanupManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { +// Cleanup mocks base method +func (m *MockdatabaseCleanupManager) Cleanup(t time.Time, isBootstrapped bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) + ret := m.ctrl.Call(m, "Cleanup", t, isBootstrapped) ret0, _ := ret[0].(error) return ret0 } -// WarmFlushCleanup indicates an expected call of WarmFlushCleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { +// Cleanup indicates an expected call of Cleanup +func (mr *MockdatabaseCleanupManagerMockRecorder) Cleanup(t, isBootstrapped interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).WarmFlushCleanup), t, isBootstrapped) -} - -// ColdFlushCleanup mocks base method -func (m *MockdatabaseCleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) - ret0, _ := ret[0].(error) - return ret0 -} - -// ColdFlushCleanup indicates an expected call of ColdFlushCleanup -func (mr *MockdatabaseCleanupManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).ColdFlushCleanup), t, isBootstrapped) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockdatabaseCleanupManager)(nil).Cleanup), t, isBootstrapped) } // Report mocks base method @@ -2605,6 +2591,20 @@ func (m *MockdatabaseFileSystemManager) EXPECT() *MockdatabaseFileSystemManagerM return m.recorder } +// Cleanup mocks base method +func (m *MockdatabaseFileSystemManager) Cleanup(t time.Time, isBootstrapped bool) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Cleanup", t, isBootstrapped) + ret0, _ := ret[0].(error) + return ret0 +} + +// Cleanup indicates an expected call of Cleanup +func (mr *MockdatabaseFileSystemManagerMockRecorder) Cleanup(t, isBootstrapped interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cleanup", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).Cleanup), t, isBootstrapped) +} + // Flush mocks base method func (m *MockdatabaseFileSystemManager) Flush(t time.Time) error { m.ctrl.T.Helper() @@ -2702,125 +2702,6 @@ func (mr *MockdatabaseFileSystemManagerMockRecorder) LastSuccessfulSnapshotStart return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastSuccessfulSnapshotStartTime", reflect.TypeOf((*MockdatabaseFileSystemManager)(nil).LastSuccessfulSnapshotStartTime)) } -// MockdatabaseColdFlushManager is a mock of databaseColdFlushManager interface -type MockdatabaseColdFlushManager struct { - ctrl *gomock.Controller - recorder *MockdatabaseColdFlushManagerMockRecorder -} - -// MockdatabaseColdFlushManagerMockRecorder is the mock recorder for MockdatabaseColdFlushManager -type MockdatabaseColdFlushManagerMockRecorder struct { - mock *MockdatabaseColdFlushManager -} - -// NewMockdatabaseColdFlushManager creates a new mock instance -func NewMockdatabaseColdFlushManager(ctrl *gomock.Controller) *MockdatabaseColdFlushManager { - mock := &MockdatabaseColdFlushManager{ctrl: ctrl} - mock.recorder = &MockdatabaseColdFlushManagerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockdatabaseColdFlushManager) EXPECT() *MockdatabaseColdFlushManagerMockRecorder { - return m.recorder -} - -// WarmFlushCleanup mocks base method -func (m *MockdatabaseColdFlushManager) WarmFlushCleanup(t time.Time, isBootstrapped bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WarmFlushCleanup", t, isBootstrapped) - ret0, _ := ret[0].(error) - return ret0 -} - -// WarmFlushCleanup indicates an expected call of WarmFlushCleanup -func (mr *MockdatabaseColdFlushManagerMockRecorder) WarmFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WarmFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).WarmFlushCleanup), t, isBootstrapped) -} - -// ColdFlushCleanup mocks base method -func (m *MockdatabaseColdFlushManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ColdFlushCleanup", t, isBootstrapped) - ret0, _ := ret[0].(error) - return ret0 -} - -// ColdFlushCleanup indicates an expected call of ColdFlushCleanup -func (mr *MockdatabaseColdFlushManagerMockRecorder) ColdFlushCleanup(t, isBootstrapped interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ColdFlushCleanup", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).ColdFlushCleanup), t, isBootstrapped) -} - -// Report mocks base method -func (m *MockdatabaseColdFlushManager) Report() { - m.ctrl.T.Helper() - m.ctrl.Call(m, "Report") -} - -// Report indicates an expected call of Report -func (mr *MockdatabaseColdFlushManagerMockRecorder) Report() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Report", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Report)) -} - -// Disable mocks base method -func (m *MockdatabaseColdFlushManager) Disable() fileOpStatus { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Disable") - ret0, _ := ret[0].(fileOpStatus) - return ret0 -} - -// Disable indicates an expected call of Disable -func (mr *MockdatabaseColdFlushManagerMockRecorder) Disable() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Disable", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Disable)) -} - -// Enable mocks base method -func (m *MockdatabaseColdFlushManager) Enable() fileOpStatus { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Enable") - ret0, _ := ret[0].(fileOpStatus) - return ret0 -} - -// Enable indicates an expected call of Enable -func (mr *MockdatabaseColdFlushManagerMockRecorder) Enable() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enable", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Enable)) -} - -// Status mocks base method -func (m *MockdatabaseColdFlushManager) Status() fileOpStatus { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Status") - ret0, _ := ret[0].(fileOpStatus) - return ret0 -} - -// Status indicates an expected call of Status -func (mr *MockdatabaseColdFlushManagerMockRecorder) Status() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Status", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Status)) -} - -// Run mocks base method -func (m *MockdatabaseColdFlushManager) Run(t time.Time) bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Run", t) - ret0, _ := ret[0].(bool) - return ret0 -} - -// Run indicates an expected call of Run -func (mr *MockdatabaseColdFlushManagerMockRecorder) Run(t interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockdatabaseColdFlushManager)(nil).Run), t) -} - // MockdatabaseShardRepairer is a mock of databaseShardRepairer interface type MockdatabaseShardRepairer struct { ctrl *gomock.Controller @@ -3064,16 +2945,16 @@ func (mr *MockdatabaseMediatorMockRecorder) Bootstrap() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Bootstrap", reflect.TypeOf((*MockdatabaseMediator)(nil).Bootstrap)) } -// DisableFileOpsAndWait mocks base method -func (m *MockdatabaseMediator) DisableFileOpsAndWait() { +// DisableFileOps mocks base method +func (m *MockdatabaseMediator) DisableFileOps() { m.ctrl.T.Helper() - m.ctrl.Call(m, "DisableFileOpsAndWait") + m.ctrl.Call(m, "DisableFileOps") } -// DisableFileOpsAndWait indicates an expected call of DisableFileOpsAndWait -func (mr *MockdatabaseMediatorMockRecorder) DisableFileOpsAndWait() *gomock.Call { +// DisableFileOps indicates an expected call of DisableFileOps +func (mr *MockdatabaseMediatorMockRecorder) DisableFileOps() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableFileOpsAndWait", reflect.TypeOf((*MockdatabaseMediator)(nil).DisableFileOpsAndWait)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DisableFileOps", reflect.TypeOf((*MockdatabaseMediator)(nil).DisableFileOps)) } // EnableFileOps mocks base method @@ -3102,6 +2983,18 @@ func (mr *MockdatabaseMediatorMockRecorder) Tick(forceType, startTime interface{ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Tick", reflect.TypeOf((*MockdatabaseMediator)(nil).Tick), forceType, startTime) } +// WaitForFileSystemProcesses mocks base method +func (m *MockdatabaseMediator) WaitForFileSystemProcesses() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "WaitForFileSystemProcesses") +} + +// WaitForFileSystemProcesses indicates an expected call of WaitForFileSystemProcesses +func (mr *MockdatabaseMediatorMockRecorder) WaitForFileSystemProcesses() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForFileSystemProcesses", reflect.TypeOf((*MockdatabaseMediator)(nil).WaitForFileSystemProcesses)) +} + // Repair mocks base method func (m *MockdatabaseMediator) Repair() error { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 0afa5fd356..68a77e2038 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -753,14 +753,9 @@ type databaseFlushManager interface { } // databaseCleanupManager manages cleaning up persistent storage space. -// NB(bodu): We have to separate flush methods since we separated out flushing into warm/cold flush -// and cleaning up certain types of data concurrently w/ either can be problematic. type databaseCleanupManager interface { - // WarmFlushCleanup cleans up data not needed in the persistent storage before a warm flush. - WarmFlushCleanup(t time.Time, isBootstrapped bool) error - - // ColdFlushCleanup cleans up data not needed in the persistent storage before a cold flush. - ColdFlushCleanup(t time.Time, isBootstrapped bool) error + // Cleanup cleans up data not needed in the persistent storage. + Cleanup(t time.Time, isBootstrapped bool) error // Report reports runtime information. Report() @@ -768,6 +763,9 @@ type databaseCleanupManager interface { // databaseFileSystemManager manages the database related filesystem activities. type databaseFileSystemManager interface { + // Cleanup cleans up data not needed in the persistent storage. + Cleanup(t time.Time, isBootstrapped bool) error + // Flush flushes in-memory data to persistent storage. Flush(t time.Time) error @@ -797,25 +795,6 @@ type databaseFileSystemManager interface { LastSuccessfulSnapshotStartTime() (time.Time, bool) } -// databaseColdFlushManager manages the database related cold flush activities. -type databaseColdFlushManager interface { - databaseCleanupManager - - // Disable disables the cold flush manager and prevents it from - // performing file operations, returns the current file operation status. - Disable() fileOpStatus - - // Enable enables the cold flush manager to perform file operations. - Enable() fileOpStatus - - // Status returns the file operation status. - Status() fileOpStatus - - // Run attempts to perform all cold flush related operations, - // returning true if those operations are performed, and false otherwise. - Run(t time.Time) bool -} - // databaseShardRepairer repairs in-memory data for a shard. type databaseShardRepairer interface { // Options returns the repair options. @@ -869,8 +848,8 @@ type databaseMediator interface { // Bootstrap bootstraps the database with file operations performed at the end. Bootstrap() (BootstrapResult, error) - // DisableFileOpsAndWait disables file operations. - DisableFileOpsAndWait() + // DisableFileOps disables file operations. + DisableFileOps() // EnableFileOps enables file operations. EnableFileOps() @@ -878,6 +857,9 @@ type databaseMediator interface { // Tick performs a tick. Tick(forceType forceType, startTime time.Time) error + // WaitForFileSystemProcesses waits for any ongoing fs processes to finish. + WaitForFileSystemProcesses() + // Repair repairs the database. Repair() error