diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 9cf5cb3e79..3151edfcb7 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -79,6 +79,7 @@ type bootstrapManager struct { processProvider bootstrap.ProcessProvider state BootstrapState hasPending bool + pendingOnCompleteFns []BootstrapCompleteFn sleepFn sleepFn nowFn clock.NowFn lastBootstrapCompletionTime xtime.UnixNano @@ -116,23 +117,29 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) return bsTime, bsTime > 0 } -func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult { - bootstrapAsyncResult := newBootstrapAsyncResult() - go func(r *BootstrapAsyncResult) { - if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping { +func (m *bootstrapManager) BootstrapEnqueue( + opts BootstrapEnqueueOptions, +) { + go func() { + result, err := m.startBootstrap(opts.OnCompleteFn) + if err != nil && !result.AlreadyBootstrapping { m.instrumentation.emitAndLogInvariantViolation(err, "error bootstrapping") } - }(bootstrapAsyncResult) - return bootstrapAsyncResult + }() } func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { - bootstrapAsyncResult := newBootstrapAsyncResult() - return m.startBootstrap(bootstrapAsyncResult) + return m.startBootstrap(nil) } -func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (BootstrapResult, error) { +func (m *bootstrapManager) startBootstrap( + onCompleteFn BootstrapCompleteFn, +) (BootstrapResult, error) { m.Lock() + if onCompleteFn != nil { + // Append completion fn if specified. + m.pendingOnCompleteFns = append(m.pendingOnCompleteFns, onCompleteFn) + } switch m.state { case Bootstrapping: // NB(r): Already bootstrapping, now a consequent bootstrap @@ -144,9 +151,6 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo m.hasPending = true m.Unlock() result := BootstrapResult{AlreadyBootstrapping: true} - asyncResult.bootstrapResult = result - asyncResult.bootstrapStarted.Done() - asyncResult.bootstrapCompleted.Done() return result, errBootstrapEnqueued default: m.state = Bootstrapping @@ -154,16 +158,12 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo m.Unlock() // NB(xichen): disable filesystem manager before we bootstrap to minimize // the impact of file operations on bootstrapping performance + m.instrumentation.log.Info("disable fileOps and wait") m.mediator.DisableFileOpsAndWait() defer m.mediator.EnableFileOps() + m.instrumentation.log.Info("fileOps disabled") var result BootstrapResult - asyncResult.bootstrapStarted.Done() - defer func() { - asyncResult.bootstrapResult = result - asyncResult.bootstrapCompleted.Done() - }() - // Keep performing bootstraps until none pending and no error returned. for i := 0; true; i++ { // NB(r): Decouple implementation of bootstrap so can override in tests. @@ -209,7 +209,19 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo m.Lock() m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn()) m.state = Bootstrapped + // NB(r): Clear out the pending completion functions and execute them if + // needed. + pendingOnCompleteFns := m.pendingOnCompleteFns + m.pendingOnCompleteFns = nil m.Unlock() + + if len(pendingOnCompleteFns) > 0 { + // Execute any on complete functions that were queued. + for _, fn := range pendingOnCompleteFns { + fn(result) + } + } + return result, nil } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 164f955a3d..0a69ef826e 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -98,9 +98,15 @@ func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) { var result BootstrapResult if async { - asyncResult := bsm.BootstrapEnqueue() - asyncResult.WaitForStart() - result = asyncResult.Result() + var wg sync.WaitGroup + wg.Add(1) + bsm.BootstrapEnqueue(BootstrapEnqueueOptions{ + OnCompleteFn: func(r BootstrapResult) { + result = r + wg.Done() + }, + }) + wg.Wait() } else { result, err = bsm.Bootstrap() require.NoError(t, err) diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 003e7312f4..024c173788 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -292,20 +292,14 @@ func (d *db) UpdateOwnedNamespaces(newNamespaces namespace.Map) error { } } + // NB: Can hold lock since all long-running tasks are enqueued to run + // async while holding the lock. d.Lock() defer d.Unlock() removes, adds, updates := d.namespaceDeltaWithLock(newNamespaces) if err := d.logNamespaceUpdate(removes, adds, updates); err != nil { - enrichedErr := fmt.Errorf("unable to log namespace updates: %v", err) - d.log.Error(enrichedErr.Error()) - return enrichedErr - } - - // add any namespaces marked for addition - if err := d.addNamespacesWithLock(adds); err != nil { - enrichedErr := fmt.Errorf("unable to add namespaces: %v", err) - d.log.Error(enrichedErr.Error()) + d.log.Error("unable to log namespace updates", zap.Error(err)) return err } @@ -317,14 +311,68 @@ func (d *db) UpdateOwnedNamespaces(newNamespaces namespace.Map) error { "restart the process if you want changes to take effect") } - // enqueue bootstraps if new namespaces if len(adds) > 0 { - d.queueBootstrapWithLock() - } + if d.bootstraps == 0 || !d.mediatorIsOpenWithLock() { + // If no bootstraps yet or mediator is not open we can just + // add the namespaces and optionally enqueue bootstrap (which is + // async) since no file operations can be in place since + // no bootstrap and/or mediator is not open. + if err := d.addNamespacesWithLock(adds); err != nil { + d.log.Error("unable to add namespaces", zap.Error(err)) + return err + } + + if d.bootstraps > 0 { + // If already bootstrapped before, enqueue another + // bootstrap (asynchronously, ok to trigger holding lock). + d.enqueueBootstrapAsync() + } + return nil + } + + // NB: mediator is opened, we need to disable fileOps and wait for all the background processes to complete + // so that we could update namespaces safely. Otherwise, there is a high chance in getting + // invariant violation panic because cold/warm flush will receive new namespaces + // in the middle of their operations. + d.Unlock() // Don't hold the lock while we wait for file ops. + d.disableFileOpsAndWait() + d.Lock() // Reacquire lock after waiting. + + // Add any namespaces marked for addition. + if err := d.addNamespacesWithLock(adds); err != nil { + d.log.Error("unable to add namespaces", zap.Error(err)) + d.enableFileOps() + return err + } + + // Enqueue bootstrap and enable file ops when bootstrap is completed. + d.enqueueBootstrapAsyncWithLock(d.enableFileOps) + } return nil } +func (d *db) mediatorIsOpenWithLock() bool { + if d.mediator == nil { + return false + } + return d.mediator.IsOpen() +} + +func (d *db) disableFileOpsAndWait() { + if mediator := d.mediator; mediator != nil && mediator.IsOpen() { + d.log.Info("waiting for file ops to be disabled") + mediator.DisableFileOpsAndWait() + } +} + +func (d *db) enableFileOps() { + if mediator := d.mediator; mediator != nil && mediator.IsOpen() { + d.log.Info("enabling file ops") + mediator.EnableFileOps() + } +} + func (d *db) namespaceDeltaWithLock(newNamespaces namespace.Map) ([]ident.ID, []namespace.Metadata, []namespace.Metadata) { var ( existing = d.namespaces @@ -454,15 +502,25 @@ func (d *db) Options() Options { } func (d *db) AssignShardSet(shardSet sharding.ShardSet) { + // NB: Can hold lock since all long running tasks are enqueued to run + // async while holding the lock. d.Lock() + defer d.Unlock() + receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) if receivedNewShards { d.lastReceivedNewShards = d.nowFn() } - d.Unlock() - if !d.mediator.IsOpen() { - d.assignShardSet(shardSet) + if d.bootstraps == 0 || !d.mediatorIsOpenWithLock() { + // If not bootstrapped before or mediator is not open then can just + // immediately assign shards. + d.assignShardsWithLock(shardSet) + if d.bootstraps > 0 { + // If already bootstrapped before, enqueue another + // bootstrap (asynchronously, ok to trigger holding lock). + d.enqueueBootstrapAsync() + } return } @@ -480,28 +538,34 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) { } func (d *db) assignShardSet(shardSet sharding.ShardSet) { + d.RLock() + receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) + d.RUnlock() + + if receivedNewShards { + // Wait outside of holding lock to disable file operations. + d.disableFileOpsAndWait() + } + + // NB: Can hold lock since all long-running tasks are enqueued to run + // async while holding the lock. d.Lock() defer d.Unlock() - d.log.Info("assigning shards", zap.Uint32s("shards", shardSet.AllIDs())) + d.assignShardsWithLock(shardSet) - receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet) - d.shardSet = shardSet + if receivedNewShards { + d.enqueueBootstrapAsyncWithLock(d.enableFileOps) + } +} +func (d *db) assignShardsWithLock(shardSet sharding.ShardSet) { + d.log.Info("assigning shards", zap.Uint32s("shards", shardSet.AllIDs())) + d.shardSet = shardSet for _, elem := range d.namespaces.Iter() { ns := elem.Value() ns.AssignShardSet(shardSet) } - - if receivedNewShards { - // Only trigger a bootstrap if the node received new shards otherwise - // the nodes will perform lots of small bootstraps (that accomplish nothing) - // during topology changes as other nodes mark their shards as available. - // - // These small bootstraps can significantly delay topology changes as they prevent - // the nodes from marking themselves as bootstrapped and durable, for example. - d.queueBootstrapWithLock() - } } func (d *db) hasReceivedNewShardsWithLock(incoming sharding.ShardSet) bool { @@ -533,7 +597,12 @@ func (d *db) ShardSet() sharding.ShardSet { return shardSet } -func (d *db) queueBootstrapWithLock() { +func (d *db) enqueueBootstrapAsync() { + d.log.Info("enqueuing bootstrap") + d.mediator.BootstrapEnqueue(BootstrapEnqueueOptions{}) +} + +func (d *db) enqueueBootstrapAsyncWithLock(onCompleteFn func()) { // Only perform a bootstrap if at least one bootstrap has already occurred. This enables // the ability to open the clustered database and assign shardsets to the non-clustered // database when it receives an initial topology (as well as topology changes) without @@ -542,11 +611,16 @@ func (d *db) queueBootstrapWithLock() { // the non-clustered database bootstrapped by assigning it shardsets which will trigger new // bootstraps since d.bootstraps > 0 will be true. if d.bootstraps > 0 { - bootstrapAsyncResult := d.mediator.BootstrapEnqueue() - // NB(linasn): We need to wait for the bootstrap to start and set it's state to Bootstrapping in order - // to safely run fileOps in mediator later. - bootstrapAsyncResult.WaitForStart() + d.log.Info("enqueuing bootstrap with onComplete function") + d.mediator.BootstrapEnqueue(BootstrapEnqueueOptions{ + OnCompleteFn: func(_ BootstrapResult) { + onCompleteFn() + }, + }) + return } + + onCompleteFn() } func (d *db) Namespace(id ident.ID) (Namespace, bool) { diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index 6ea1bed3ce..134f316a62 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -425,9 +425,10 @@ func TestDatabaseAssignShardSet(t *testing.T) { require.True(t, d.lastReceivedNewShards.After(t1)) wg.Wait() + assertFileOpsEnabled(t, d) } -func TestDatabaseAssignShardSetBehaviorNoNewShards(t *testing.T) { +func TestDatabaseAssignShardSetEnqueueBootstrapWhenMediatorClosed(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() @@ -436,15 +437,45 @@ func TestDatabaseAssignShardSetBehaviorNoNewShards(t *testing.T) { close(mapCh) }() - // Set a mock mediator to be certain that bootstrap is not called when - // no new shards are assigned. - mediator := NewMockdatabaseMediator(ctrl) - mediator.EXPECT().IsOpen().Return(true) - mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { - fn() - return nil - }) - d.mediator = mediator + mockMediator := NewMockdatabaseMediator(ctrl) + mockMediator.EXPECT().IsOpen().Return(false) + mockMediator.EXPECT().BootstrapEnqueue(BootstrapEnqueueOptions{}) + d.mediator = mockMediator + d.bootstraps = 1 + + var ns []*MockdatabaseNamespace + ns = append(ns, + dbAddNewMockNamespace(ctrl, d, "testns1"), + dbAddNewMockNamespace(ctrl, d, "testns2")) + + shards := append(sharding.NewShards([]uint32{0, 1}, shard.Available), + sharding.NewShards([]uint32{2}, shard.Initializing)...) + shardSet, err := sharding.NewShardSet(shards, nil) + require.NoError(t, err) + + var wg sync.WaitGroup + wg.Add(len(ns)) + for _, n := range ns { + n.EXPECT().AssignShardSet(shardSet).Do(func(_ sharding.ShardSet) { + wg.Done() + }) + } + + t1 := d.lastReceivedNewShards + d.AssignShardSet(shardSet) + require.True(t, d.lastReceivedNewShards.After(t1)) + + wg.Wait() +} + +func TestDatabaseAssignShardSetBehaviorNoNewShards(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + defer func() { + close(mapCh) + }() var ns []*MockdatabaseNamespace ns = append(ns, dbAddNewMockNamespace(ctrl, d, "testns1")) @@ -478,9 +509,14 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { ns := dbAddNewMockNamespace(ctrl, d, "testns") mediator := NewMockdatabaseMediator(ctrl) - mediator.EXPECT().IsOpen().Return(true) + mediator.EXPECT().IsOpen().Return(true).AnyTimes() + mediator.EXPECT().DisableFileOpsAndWait().AnyTimes() + mediator.EXPECT().EnableFileOps().AnyTimes() mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).DoAndReturn(func(fn func()) error { - fn() + // Spawn async since this method is called while DB holds + // lock and expects the mediator to call it asynchronously + // (which avoids deadlocking). + go fn() return nil }) mediator.EXPECT().Bootstrap().DoAndReturn(func() (BootstrapResult, error) { @@ -499,12 +535,11 @@ func TestDatabaseBootstrappedAssignShardSet(t *testing.T) { var wg sync.WaitGroup wg.Add(1) - mediator.EXPECT().BootstrapEnqueue().DoAndReturn(func() *BootstrapAsyncResult { - asyncResult := newBootstrapAsyncResult() - asyncResult.bootstrapStarted = &wg - wg.Done() - return asyncResult - }) + mediator.EXPECT(). + BootstrapEnqueue(gomock.Any()). + Do(func(_ BootstrapEnqueueOptions) { + wg.Done() + }) d.AssignShardSet(shardSet) @@ -520,6 +555,7 @@ func TestDatabaseAssignShardSetShouldPanic(t *testing.T) { close(mapCh) }() + d.bootstraps = 1 mediator := NewMockdatabaseMediator(ctrl) mediator.EXPECT().IsOpen().Return(true) mediator.EXPECT().EnqueueMutuallyExclusiveFn(gomock.Any()).Return(errors.New("unknown error")) @@ -599,15 +635,7 @@ func TestDatabaseAddNamespace(t *testing.T) { nses := d.Namespaces() require.Len(t, nses, 2) - // construct new namespace Map - md1, err := namespace.NewMetadata(defaultTestNs1ID, defaultTestNs1Opts) - require.NoError(t, err) - md2, err := namespace.NewMetadata(defaultTestNs2ID, defaultTestNs2Opts) - require.NoError(t, err) - md3, err := namespace.NewMetadata(ident.StringID("and1"), defaultTestNs1Opts) - require.NoError(t, err) - nsMap, err := namespace.NewMap([]namespace.Metadata{md1, md2, md3}) - require.NoError(t, err) + md1, md2, _, nsMap := addNamespace(t, "and1") // update the database watch with new Map mapCh <- nsMap @@ -639,6 +667,173 @@ func TestDatabaseAddNamespace(t *testing.T) { ns3, ok := d.Namespace(ident.StringID("and1")) require.True(t, ok) require.Equal(t, md1.Options(), ns3.Options()) + assertFileOpsEnabled(t, d) +} + +type testNamespaceHooks struct { + sync.Mutex + adds int +} + +func (th *testNamespaceHooks) addCount() int { + th.Lock() + defer th.Unlock() + return th.adds +} + +func (th *testNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error { + th.Lock() + defer th.Unlock() + th.adds++ + return nil +} + +func TestDatabaseAddNamespaceBootstrapEnqueue(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + require.NoError(t, d.Open()) + defer func() { + close(mapCh) + require.NoError(t, d.Close()) + leaktest.CheckTimeout(t, time.Second)() + }() + + // retrieve the update channel to track propatation + updateCh := d.opts.NamespaceInitializer().(*mockNsInitializer).updateCh + + nsHooks := &testNamespaceHooks{} + d.opts = d.opts.SetNamespaceHooks(nsHooks) + d.bootstraps++ + + // check initial namespaces + nses := d.Namespaces() + require.Len(t, nses, 2) + + _, _, md3, nsMap := addNamespace(t, "nsNew") + + // update the database watch with new Map + mapCh <- nsMap + + // wait till the update has propagated + <-updateCh + <-updateCh + + // Because ns update will be enqueued and performed later, we need to wait for more time in theory. + // Usually this update should complete in a few seconds. + require.True(t, xclock.WaitUntil(func() bool { + return nsHooks.addCount() == 1 + }, 1*time.Minute)) + require.True(t, xclock.WaitUntil(func() bool { + return len(d.Namespaces()) == 3 + }, 2*time.Second)) + + // ensure the expected namespaces exist + nses = d.Namespaces() + require.Len(t, nses, 3) + ns3, ok := d.Namespace(ident.StringID("nsNew")) + require.True(t, ok) + require.Equal(t, md3.Options(), ns3.Options()) + assertFileOpsEnabled(t, d) +} + +type errorNamespaceHooks struct{} + +func (th *errorNamespaceHooks) OnCreatedNamespace(Namespace, GetNamespaceFn) error { + return errors.New("failed to create namespace") +} + +func TestDatabaseAddNamespaceErrorAfterWaitForFileOps(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + require.NoError(t, d.Open()) + defer func() { + close(mapCh) + require.NoError(t, d.Close()) + leaktest.CheckTimeout(t, time.Second)() + }() + + nsHooks := &errorNamespaceHooks{} + d.opts = d.opts.SetNamespaceHooks(nsHooks) + + _, _, _, nsMap := addNamespace(t, "testns3") + d.bootstraps = 1 + + require.Error(t, d.UpdateOwnedNamespaces(nsMap)) + assertFileOpsEnabled(t, d) +} + +func TestDatabaseAddNamespaceBootstrapEnqueueMediatorClosed(t *testing.T) { + ctrl := xtest.NewController(t) + defer ctrl.Finish() + + d, mapCh, _ := defaultTestDatabase(t, ctrl, Bootstrapped) + require.NoError(t, d.Open()) + mediator := d.mediator + defer func() { + close(mapCh) + d.mediator = mediator + require.NoError(t, d.Close()) + leaktest.CheckTimeout(t, time.Second)() + }() + + // retrieve the update channel to track propatation + updateCh := d.opts.NamespaceInitializer().(*mockNsInitializer).updateCh + + nsHooks := &testNamespaceHooks{} + d.opts = d.opts.SetNamespaceHooks(nsHooks) + mockMediator := NewMockdatabaseMediator(ctrl) + mockMediator.EXPECT().IsOpen().Return(false).AnyTimes() + mockMediator.EXPECT().BootstrapEnqueue(BootstrapEnqueueOptions{}) + d.mediator = mockMediator + + // check initial namespaces + nses := d.Namespaces() + require.Len(t, nses, 2) + + _, _, md3, nsMap := addNamespace(t, "testns3") + d.bootstraps = 1 + // update the database watch with new Map + mapCh <- nsMap + + // wait till the update has propagated + <-updateCh + <-updateCh + + // Because ns update will be enqueued and performed later, we need to wait for more time in theory. + // Usually this update should complete in a few seconds. + require.True(t, xclock.WaitUntil(func() bool { + return nsHooks.addCount() == 1 + }, time.Minute)) + require.True(t, xclock.WaitUntil(func() bool { + return len(d.Namespaces()) == 3 + }, 2*time.Second)) + + // ensure the expected namespaces exist + nses = d.Namespaces() + require.Len(t, nses, 3) + ns3, ok := d.Namespace(ident.StringID("testns3")) + require.True(t, ok) + require.Equal(t, md3.Options(), ns3.Options()) +} + +func addNamespace( + t *testing.T, + ns string, +) (namespace.Metadata, namespace.Metadata, namespace.Metadata, namespace.Map) { + // construct new namespace Map + md1, err := namespace.NewMetadata(defaultTestNs1ID, defaultTestNs1Opts) + require.NoError(t, err) + md2, err := namespace.NewMetadata(defaultTestNs2ID, defaultTestNs2Opts) + require.NoError(t, err) + md3, err := namespace.NewMetadata(ident.StringID(ns), defaultTestNs1Opts) + require.NoError(t, err) + nsMap, err := namespace.NewMap([]namespace.Metadata{md1, md2, md3}) + require.NoError(t, err) + return md1, md2, md3, nsMap } func TestDatabaseUpdateNamespace(t *testing.T) { @@ -1358,10 +1553,10 @@ func TestDatabaseIsBootstrapped(t *testing.T) { close(mapCh) }() - mediator := NewMockdatabaseMediator(ctrl) - mediator.EXPECT().IsBootstrapped().Return(true) - mediator.EXPECT().IsBootstrapped().Return(false) - d.mediator = mediator + md := NewMockdatabaseMediator(ctrl) + md.EXPECT().IsBootstrapped().Return(true) + md.EXPECT().IsBootstrapped().Return(false) + d.mediator = md assert.True(t, d.IsBootstrapped()) assert.False(t, d.IsBootstrapped()) @@ -1552,3 +1747,17 @@ func TestNewAggregateTilesOptions(t *testing.T) { true, true, map[string]annotation.Payload{}, insOpts) assert.NoError(t, err) } + +func assertFileOpsEnabled(t *testing.T, d *db) { + mediator := d.mediator.(*mediator) + coldFlushManager := mediator.databaseColdFlushManager.(*coldFlushManager) + fileSystemManager := mediator.databaseFileSystemManager.(*fileSystemManager) + + coldFlushManager.RLock() + require.True(t, coldFlushManager.enabled) + coldFlushManager.RUnlock() + + fileSystemManager.RLock() + require.True(t, fileSystemManager.enabled) + fileSystemManager.RUnlock() +} diff --git a/src/dbnode/storage/mediator.go b/src/dbnode/storage/mediator.go index 25f7faafbd..d1008e1f7a 100644 --- a/src/dbnode/storage/mediator.go +++ b/src/dbnode/storage/mediator.go @@ -178,17 +178,17 @@ func (m *mediator) Open() error { } func (m *mediator) DisableFileOpsAndWait() { - status := m.databaseFileSystemManager.Disable() - for status == fileOpInProgress { - m.sleepFn(fileOpCheckInterval) - status = m.databaseFileSystemManager.Status() - } + fsStatus := m.databaseFileSystemManager.Disable() // Even though the cold flush runs separately, its still // considered a fs process. - status = m.databaseColdFlushManager.Disable() - for status == fileOpInProgress { + cfStatus := m.databaseColdFlushManager.Disable() + for fsStatus == fileOpInProgress { + m.sleepFn(fileOpCheckInterval) + fsStatus = m.databaseFileSystemManager.Status() + } + for cfStatus == fileOpInProgress { m.sleepFn(fileOpCheckInterval) - status = m.databaseColdFlushManager.Status() + cfStatus = m.databaseColdFlushManager.Status() } } diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index d1cd6433b5..8f130f9450 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2912,17 +2912,15 @@ func (mr *MockdatabaseBootstrapManagerMockRecorder) Bootstrap() *gomock.Call { } // BootstrapEnqueue mocks base method. -func (m *MockdatabaseBootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult { +func (m *MockdatabaseBootstrapManager) BootstrapEnqueue(opts BootstrapEnqueueOptions) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BootstrapEnqueue") - ret0, _ := ret[0].(*BootstrapAsyncResult) - return ret0 + m.ctrl.Call(m, "BootstrapEnqueue", opts) } // BootstrapEnqueue indicates an expected call of BootstrapEnqueue. -func (mr *MockdatabaseBootstrapManagerMockRecorder) BootstrapEnqueue() *gomock.Call { +func (mr *MockdatabaseBootstrapManagerMockRecorder) BootstrapEnqueue(opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).BootstrapEnqueue)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseBootstrapManager)(nil).BootstrapEnqueue), opts) } // IsBootstrapped mocks base method. @@ -3627,17 +3625,15 @@ func (mr *MockdatabaseMediatorMockRecorder) Bootstrap() *gomock.Call { } // BootstrapEnqueue mocks base method. -func (m *MockdatabaseMediator) BootstrapEnqueue() *BootstrapAsyncResult { +func (m *MockdatabaseMediator) BootstrapEnqueue(opts BootstrapEnqueueOptions) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "BootstrapEnqueue") - ret0, _ := ret[0].(*BootstrapAsyncResult) - return ret0 + m.ctrl.Call(m, "BootstrapEnqueue", opts) } // BootstrapEnqueue indicates an expected call of BootstrapEnqueue. -func (mr *MockdatabaseMediatorMockRecorder) BootstrapEnqueue() *gomock.Call { +func (mr *MockdatabaseMediatorMockRecorder) BootstrapEnqueue(opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseMediator)(nil).BootstrapEnqueue)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapEnqueue", reflect.TypeOf((*MockdatabaseMediator)(nil).BootstrapEnqueue), opts) } // Close mocks base method. diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index e6d9e3a345..e3ef369210 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -849,49 +849,30 @@ type databaseBootstrapManager interface { Bootstrap() (BootstrapResult, error) // BootstrapEnqueue performs bootstrapping asynchronously for all namespaces and shards owned. - BootstrapEnqueue() *BootstrapAsyncResult + BootstrapEnqueue(opts BootstrapEnqueueOptions) // Report reports runtime information. Report() } +// BootstrapCompleteFn is a callback for when bootstrap is complete when using +// BootstrapEnqueue method. +type BootstrapCompleteFn func(BootstrapResult) + +// BootstrapEnqueueOptions is options to pass to BootstrapEnqueue when +// enqueuing a bootstrap. +type BootstrapEnqueueOptions struct { + // OnCompleteFn is an optional function to pass to execute once + // the set of queued bootstraps are complete. + OnCompleteFn BootstrapCompleteFn +} + // BootstrapResult is a bootstrap result. type BootstrapResult struct { ErrorsBootstrap []error AlreadyBootstrapping bool } -// BootstrapAsyncResult is a bootstrap async result. -type BootstrapAsyncResult struct { - bootstrapStarted *sync.WaitGroup - bootstrapCompleted *sync.WaitGroup - bootstrapResult BootstrapResult -} - -func newBootstrapAsyncResult() *BootstrapAsyncResult { - var ( - wgStarted sync.WaitGroup - wgCompleted sync.WaitGroup - ) - wgStarted.Add(1) - wgCompleted.Add(1) - return &BootstrapAsyncResult{ - bootstrapStarted: &wgStarted, - bootstrapCompleted: &wgCompleted, - } -} - -// Result will wait for bootstrap to complete and return BootstrapResult. -func (b *BootstrapAsyncResult) Result() BootstrapResult { - b.bootstrapCompleted.Wait() - return b.bootstrapResult -} - -// WaitForStart waits until bootstrap has been started. -func (b *BootstrapAsyncResult) WaitForStart() { - b.bootstrapStarted.Wait() -} - // databaseFlushManager manages flushing in-memory data to persistent storage. type databaseFlushManager interface { // Flush flushes in-memory data to persistent storage. @@ -1044,8 +1025,8 @@ type databaseMediator interface { // Bootstrap bootstraps the database with file operations performed at the end. Bootstrap() (BootstrapResult, error) - // BootstrapEnqueue bootstraps the database asynchronously with file operations performed at the end. - BootstrapEnqueue() *BootstrapAsyncResult + // BootstrapEnqueue performs bootstrapping asynchronously for all namespaces and shards owned. + BootstrapEnqueue(opts BootstrapEnqueueOptions) // DisableFileOpsAndWait disables file operations. DisableFileOpsAndWait() diff --git a/src/x/context/context.go b/src/x/context/context.go index b636df2f6d..d2d0ecd3dd 100644 --- a/src/x/context/context.go +++ b/src/x/context/context.go @@ -345,15 +345,29 @@ func (c *ctx) DistanceFromRootContext() uint16 { return distanceFromRootContext } +func (c *ctx) CheckedAndNotSampled() bool { + c.RLock() + checkedAndNotSampled := c.checkedAndNotSampled + c.RUnlock() + + return checkedAndNotSampled +} + +func (c *ctx) setCheckedAndNotSampled(b bool) { + c.Lock() + c.checkedAndNotSampled = b + c.Unlock() +} + func (c *ctx) StartSampledTraceSpan(name string) (Context, opentracing.Span, bool) { - if c.checkedAndNotSampled || c.DistanceFromRootContext() >= maxDistanceFromRootContext { + if c.CheckedAndNotSampled() || c.DistanceFromRootContext() >= maxDistanceFromRootContext { return c, noopTracer.StartSpan(name), false } goCtx := c.GoContext() childGoCtx, span, sampled := StartSampledTraceSpan(goCtx, name) if !sampled { - c.checkedAndNotSampled = true + c.setCheckedAndNotSampled(true) return c, noopTracer.StartSpan(name), false }