Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Get rid of excessive locking when adding new namespaces #3765

Merged
merged 29 commits into from
Oct 6, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
d5a12ad
Set `Bootstrapping` state when waiting for fileOps to be disabled com…
soundvibe Sep 21, 2021
3e0773b
Release database lock before enqueueing bootstrap because it could ta…
soundvibe Sep 22, 2021
0c4bd0b
Rollback bootstrapping state change.
soundvibe Sep 22, 2021
faef9d7
disable/enable fileOps when adding a new namespace.
soundvibe Sep 22, 2021
fe6a112
removed comments added for testing.
soundvibe Sep 22, 2021
0ea2556
fixed linter issues.
soundvibe Sep 22, 2021
9f6fb20
fixed potential issue when competing bootstraps could be started and …
soundvibe Sep 22, 2021
f2cf79c
fixed typo in comment.
soundvibe Sep 22, 2021
c35dd26
implemented similar enqueue bootstrap logics to `assignShardSet` as w…
soundvibe Sep 23, 2021
ff93b1c
Disable both file ops first and then wait for them to stop (should re…
soundvibe Sep 23, 2021
b8d552c
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Sep 23, 2021
194ac64
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Sep 23, 2021
f3fcdb4
New namespace adds can now be enqueued and be non-blocking.
soundvibe Sep 24, 2021
d7383aa
small cleanup
soundvibe Sep 24, 2021
08f755c
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Sep 24, 2021
0737c60
reverse commit
soundvibe Sep 24, 2021
b2ac042
We can enqueue bootstrap with lock to avoid potential race now becaus…
soundvibe Sep 24, 2021
e537318
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Sep 24, 2021
db80e1e
unlock right after `namespaceDeltaWithLock()`
soundvibe Oct 4, 2021
ff41d46
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Oct 4, 2021
d8301c9
Simplify callback logic by having the mediator own the lifecycle of t…
robskillington Oct 5, 2021
56a5027
Added test when bootstrap is enqueued in case new namespaces are added.
soundvibe Oct 5, 2021
c5758a3
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Oct 5, 2021
1886a24
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
robskillington Oct 6, 2021
5b5978a
Simplify some code paths further and use lock/defer unlock in more pl…
robskillington Oct 6, 2021
e09cc8b
More test coverage.
soundvibe Oct 6, 2021
52b554e
Fixed possible race in unit test assertion.
soundvibe Oct 6, 2021
44a7226
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Oct 6, 2021
15fa0cd
Merge branch 'master' into linasn/bootstrap-after-fileops-wait
soundvibe Oct 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type bootstrapManager struct {
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
pendingOnCompleteFns []BootstrapCompleteFn
sleepFn sleepFn
nowFn clock.NowFn
lastBootstrapCompletionTime xtime.UnixNano
Expand Down Expand Up @@ -116,23 +117,29 @@ func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool)
return bsTime, bsTime > 0
}

func (m *bootstrapManager) BootstrapEnqueue() *BootstrapAsyncResult {
bootstrapAsyncResult := newBootstrapAsyncResult()
go func(r *BootstrapAsyncResult) {
if result, err := m.startBootstrap(r); err != nil && !result.AlreadyBootstrapping {
func (m *bootstrapManager) BootstrapEnqueue(
opts BootstrapEnqueueOptions,
) {
go func() {
result, err := m.startBootstrap(opts.OnCompleteFn)
if err != nil && !result.AlreadyBootstrapping {
m.instrumentation.emitAndLogInvariantViolation(err, "error bootstrapping")
}
}(bootstrapAsyncResult)
return bootstrapAsyncResult
}()
}

func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
bootstrapAsyncResult := newBootstrapAsyncResult()
return m.startBootstrap(bootstrapAsyncResult)
return m.startBootstrap(nil)
}

func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (BootstrapResult, error) {
func (m *bootstrapManager) startBootstrap(
onCompleteFn BootstrapCompleteFn,
) (BootstrapResult, error) {
m.Lock()
if onCompleteFn != nil {
// Append completion fn if specified.
m.pendingOnCompleteFns = append(m.pendingOnCompleteFns, onCompleteFn)
}
switch m.state {
case Bootstrapping:
// NB(r): Already bootstrapping, now a consequent bootstrap
Expand All @@ -144,26 +151,19 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo
m.hasPending = true
m.Unlock()
result := BootstrapResult{AlreadyBootstrapping: true}
asyncResult.bootstrapResult = result
asyncResult.bootstrapStarted.Done()
asyncResult.bootstrapCompleted.Done()
return result, errBootstrapEnqueued
default:
m.state = Bootstrapping
}
m.Unlock()
// NB(xichen): disable filesystem manager before we bootstrap to minimize
// the impact of file operations on bootstrapping performance
m.instrumentation.log.Info("disable fileOps and wait")
m.mediator.DisableFileOpsAndWait()
defer m.mediator.EnableFileOps()
m.instrumentation.log.Info("fileOps disabled")

var result BootstrapResult
asyncResult.bootstrapStarted.Done()
defer func() {
asyncResult.bootstrapResult = result
asyncResult.bootstrapCompleted.Done()
}()

// Keep performing bootstraps until none pending and no error returned.
for i := 0; true; i++ {
// NB(r): Decouple implementation of bootstrap so can override in tests.
Expand Down Expand Up @@ -209,7 +209,19 @@ func (m *bootstrapManager) startBootstrap(asyncResult *BootstrapAsyncResult) (Bo
m.Lock()
m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn())
m.state = Bootstrapped
// NB(r): Clear out the pending completion functions and execute them if
// needed.
pendingOnCompleteFns := m.pendingOnCompleteFns
m.pendingOnCompleteFns = nil
m.Unlock()

if len(pendingOnCompleteFns) > 0 {
// Execute any on complete functions that were queued.
for _, fn := range pendingOnCompleteFns {
fn(result)
}
}

return result, nil
}

Expand Down
12 changes: 9 additions & 3 deletions src/dbnode/storage/bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,15 @@ func testDatabaseBootstrapWithBootstrapError(t *testing.T, async bool) {

var result BootstrapResult
if async {
asyncResult := bsm.BootstrapEnqueue()
asyncResult.WaitForStart()
result = asyncResult.Result()
var wg sync.WaitGroup
wg.Add(1)
bsm.BootstrapEnqueue(BootstrapEnqueueOptions{
OnCompleteFn: func(r BootstrapResult) {
result = r
wg.Done()
},
})
wg.Wait()
} else {
result, err = bsm.Bootstrap()
require.NoError(t, err)
Expand Down
135 changes: 99 additions & 36 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,12 @@ func (d *db) UpdateOwnedNamespaces(newNamespaces namespace.Map) error {
}
}

d.Lock()
defer d.Unlock()

d.RLock()
removes, adds, updates := d.namespaceDeltaWithLock(newNamespaces)
noBootstrapNeeded := d.bootstraps == 0
d.RUnlock()
if err := d.logNamespaceUpdate(removes, adds, updates); err != nil {
enrichedErr := fmt.Errorf("unable to log namespace updates: %v", err)
d.log.Error(enrichedErr.Error())
return enrichedErr
}

// add any namespaces marked for addition
if err := d.addNamespacesWithLock(adds); err != nil {
enrichedErr := fmt.Errorf("unable to add namespaces: %v", err)
d.log.Error(enrichedErr.Error())
d.log.Error("unable to log namespace updates", zap.Error(err))
return err
}

Expand All @@ -317,14 +309,61 @@ func (d *db) UpdateOwnedNamespaces(newNamespaces namespace.Map) error {
"restart the process if you want changes to take effect")
}

// enqueue bootstraps if new namespaces
if len(adds) > 0 {
d.queueBootstrapWithLock()
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
}
if noBootstrapNeeded {
d.Lock()
defer d.Unlock()
return d.addNamespacesWithLock(adds)
}

if mediator := d.mediator; mediator == nil || !mediator.IsOpen() {
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
d.log.Info("mediator is closed, adding namespaces")
d.Lock()
if err := d.addNamespacesWithLock(adds); err != nil {
d.Unlock()
d.log.Error("unable to add namespaces", zap.Error(err))
return err
}
d.Unlock()

d.enqueueBootstrap()
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

// NB: mediator is opened, we need to disable fileOps and wait for all the background processes to complete
// so that we could update namespaces safely. Otherwise, there is a high chance in getting
// invariant violation panic because cold/warm flush will receive new namespaces
// in the middle of their operations.
d.disableFileOpsAndWait()
// add any namespaces marked for addition
d.Lock()
if err := d.addNamespacesWithLock(adds); err != nil {
d.Unlock()
d.log.Error("unable to add namespaces", zap.Error(err))
d.enableFileOps()
return err
}
// enqueue bootstrap and enable file ops when bootstrap is completed
d.enqueueBootstrapWithLock(d.enableFileOps)
d.Unlock()
}
return nil
}

func (d *db) disableFileOpsAndWait() {
if mediator := d.mediator; mediator != nil && mediator.IsOpen() {
d.log.Info("waiting for file ops to be disabled")
mediator.DisableFileOpsAndWait()
}
}

func (d *db) enableFileOps() {
if mediator := d.mediator; mediator != nil && mediator.IsOpen() {
d.log.Info("enabling file ops")
mediator.EnableFileOps()
}
}

func (d *db) namespaceDeltaWithLock(newNamespaces namespace.Map) ([]ident.ID, []namespace.Metadata, []namespace.Metadata) {
var (
existing = d.namespaces
Expand Down Expand Up @@ -459,10 +498,20 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) {
if receivedNewShards {
d.lastReceivedNewShards = d.nowFn()
}
if d.bootstraps == 0 {
// no need to enqueue bootstrap, just assign shards and return
d.assignShardsWithLock(shardSet)
d.Unlock()
return
}
d.Unlock()

if !d.mediator.IsOpen() {
d.assignShardSet(shardSet)
if mediator := d.mediator; mediator == nil || !mediator.IsOpen() {
d.Lock()
d.assignShardsWithLock(shardSet)
d.Unlock()

d.enqueueBootstrap()
return
}

Expand All @@ -480,28 +529,32 @@ func (d *db) AssignShardSet(shardSet sharding.ShardSet) {
}

func (d *db) assignShardSet(shardSet sharding.ShardSet) {
d.RLock()
receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet)
d.RUnlock()

if receivedNewShards {
d.disableFileOpsAndWait()

d.Lock()
d.assignShardsWithLock(shardSet)
d.enqueueBootstrapWithLock(d.enableFileOps)
d.Unlock()
return
}

d.Lock()
defer d.Unlock()
d.assignShardsWithLock(shardSet)
d.Unlock()
}

func (d *db) assignShardsWithLock(shardSet sharding.ShardSet) {
d.log.Info("assigning shards", zap.Uint32s("shards", shardSet.AllIDs()))

receivedNewShards := d.hasReceivedNewShardsWithLock(shardSet)
d.shardSet = shardSet

for _, elem := range d.namespaces.Iter() {
ns := elem.Value()
ns.AssignShardSet(shardSet)
}

if receivedNewShards {
// Only trigger a bootstrap if the node received new shards otherwise
// the nodes will perform lots of small bootstraps (that accomplish nothing)
// during topology changes as other nodes mark their shards as available.
//
// These small bootstraps can significantly delay topology changes as they prevent
// the nodes from marking themselves as bootstrapped and durable, for example.
d.queueBootstrapWithLock()
}
}

func (d *db) hasReceivedNewShardsWithLock(incoming sharding.ShardSet) bool {
Expand Down Expand Up @@ -533,7 +586,12 @@ func (d *db) ShardSet() sharding.ShardSet {
return shardSet
}

func (d *db) queueBootstrapWithLock() {
func (d *db) enqueueBootstrap() {
d.log.Info("enqueuing bootstrap")
d.mediator.BootstrapEnqueue(BootstrapEnqueueOptions{})
}

func (d *db) enqueueBootstrapWithLock(onCompleteFn func()) {
// Only perform a bootstrap if at least one bootstrap has already occurred. This enables
// the ability to open the clustered database and assign shardsets to the non-clustered
// database when it receives an initial topology (as well as topology changes) without
Expand All @@ -542,11 +600,16 @@ func (d *db) queueBootstrapWithLock() {
// the non-clustered database bootstrapped by assigning it shardsets which will trigger new
// bootstraps since d.bootstraps > 0 will be true.
if d.bootstraps > 0 {
bootstrapAsyncResult := d.mediator.BootstrapEnqueue()
// NB(linasn): We need to wait for the bootstrap to start and set it's state to Bootstrapping in order
// to safely run fileOps in mediator later.
bootstrapAsyncResult.WaitForStart()
d.log.Info("enqueuing bootstrap with onComplete function")
d.mediator.BootstrapEnqueue(BootstrapEnqueueOptions{
OnCompleteFn: func(_ BootstrapResult) {
onCompleteFn()
},
})
return
}

onCompleteFn()
}

func (d *db) Namespace(id ident.ID) (Namespace, bool) {
Expand Down
Loading