Skip to content

Commit

Permalink
[dbnode] Support dynamically registered background processes in media…
Browse files Browse the repository at this point in the history
…tor (#2634)

* [dbnode] Support dynamically registered background processes in mediator

* [dbnode] Pass background processes via RunOptions

* Add a comment for BackgroundProcess

* Add integration test TestRunCustomBackgroundProcess

* Include backgroundProcess in TestDatabaseMediatorOpenClose

* Move the source of BackgroundProcess asynchrony outside for consistency/easier testing

* Fix test

* Formatting

* Update BackgroundProcess doc

* go mod tidy

* Verify reporting in TestRunCustomBackgroundProcess

* Address review feedback

* Resurrect BackgroundProcess.Start()

* Enforce mediator.RegisterBackgroundProcess before Open

* Remove locking from mediator.Report

* Fix comment
  • Loading branch information
linasm authored Oct 2, 2020
1 parent ef9ba0a commit 5bcb1ba
Show file tree
Hide file tree
Showing 14 changed files with 365 additions and 60 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ require (
github.com/stretchr/testify v1.6.1
github.com/subosito/gotenv v1.2.1-0.20190917103637-de67a6614a4d // indirect
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553
github.com/uber-go/atomic v0.0.0-00010101000000-000000000000
github.com/uber-go/tally v3.3.13+incompatible
github.com/uber/jaeger-client-go v2.25.0+incompatible
github.com/uber/jaeger-lib v2.2.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,8 @@ github.com/twmb/murmur3 v1.1.4 h1:NnlAxelwOgdQDmYuV0T/K+tpDQ/8wdsDVOGmvUqBOCw=
github.com/twmb/murmur3 v1.1.4/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553 h1:DRC1ubdb3ZmyyIeCSTxjZIQAnpLPfKVgYrLETQuOPjo=
github.com/twotwotwo/sorts v0.0.0-20160814051341-bf5c1f2b8553/go.mod h1:Rj7Csq/tZ/egz+Ltc2IVpsA5309AmSMEswjkTZmq2Xc=
github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o=
github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber-go/tally v3.3.13+incompatible h1:5ic2UsDwjcWsw9jvEdWEE2XsmGCLMTt5Ukg4d74fed4=
github.com/uber-go/tally v3.3.13+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/jaeger-client-go v2.25.0+incompatible h1:IxcNZ7WRY1Y3G4poYlx24szfsn/3LvK9QHCq9oQw8+U=
Expand Down
22 changes: 22 additions & 0 deletions src/dbnode/integration/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ const (

// defaultWriteNewSeriesAsync inserts, and index series' synchronously by default.
defaultWriteNewSeriesAsync = false

// defaultReportInterval is the default time interval of reporting metrics within the system.
defaultReportInterval = time.Second
)

var (
Expand Down Expand Up @@ -283,6 +286,12 @@ type TestOptions interface {

// NowFn returns the now fn.
NowFn() func() time.Time

// SetReportInterval sets the time between reporting metrics within the system.
SetReportInterval(value time.Duration) TestOptions

// ReportInterval returns the time between reporting metrics within the system.
ReportInterval() time.Duration
}

type options struct {
Expand Down Expand Up @@ -316,6 +325,7 @@ type options struct {
protoEncoding bool
assertEqual assertTestDataEqual
nowFn func() time.Time
reportInterval time.Duration
}

// NewTestOptions returns a new set of integration test options.
Expand Down Expand Up @@ -349,6 +359,7 @@ func NewTestOptions(t *testing.T) TestOptions {
useTChannelClientForWriting: defaultUseTChannelClientForWriting,
useTChannelClientForTruncation: defaultUseTChannelClientForTruncation,
writeNewSeriesAsync: defaultWriteNewSeriesAsync,
reportInterval: defaultReportInterval,
}
}

Expand Down Expand Up @@ -382,6 +393,7 @@ func (o *options) SetID(value string) TestOptions {
func (o *options) ID() string {
return o.id
}

func (o *options) SetTickMinimumInterval(value time.Duration) TestOptions {
opts := *o
opts.tickMinimumInterval = value
Expand Down Expand Up @@ -653,3 +665,13 @@ func (o *options) SetNowFn(value func() time.Time) TestOptions {
func (o *options) NowFn() func() time.Time {
return o.nowFn
}

func (o *options) SetReportInterval(value time.Duration) TestOptions {
opts := *o
opts.reportInterval = value
return &opts
}

func (o *options) ReportInterval() time.Duration {
return o.reportInterval
}
98 changes: 98 additions & 0 deletions src/dbnode/integration/run_custom_background_process_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// +build integration

// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/storage"
xclock "github.com/m3db/m3/src/x/clock"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/atomic"
)

type testBackgroundProcess struct {
executed, reported, stopped atomic.Int32
}

func (p *testBackgroundProcess) run() {
xclock.WaitUntil(func() bool {
return p.reported.Load() > 0
}, time.Minute)
p.executed.Inc()
xclock.WaitUntil(func() bool {
return p.stopped.Load() > 0
}, time.Minute)
}

func (p *testBackgroundProcess) Start() {
go p.run()
}

func (p *testBackgroundProcess) Stop() {
p.stopped.Inc()
return
}

func (p *testBackgroundProcess) Report() {
p.reported.Inc()
return
}

func TestRunCustomBackgroundProcess(t *testing.T) {
testOpts := NewTestOptions(t).SetReportInterval(time.Millisecond)
testSetup := newTestSetupWithCommitLogAndFilesystemBootstrapper(t, testOpts)
defer testSetup.Close()

backgroundProcess := &testBackgroundProcess{}

storageOpts := testSetup.StorageOpts().
SetBackgroundProcessFns([]storage.NewBackgroundProcessFn{
func(storage.Database, storage.Options) (storage.BackgroundProcess, error) {
return backgroundProcess, nil
}})
testSetup.SetStorageOpts(storageOpts)

log := storageOpts.InstrumentOptions().Logger()

// Start the server.
require.NoError(t, testSetup.StartServer())

// Stop the server.
defer func() {
require.NoError(t, testSetup.StopServer())
log.Debug("server is now down")
assert.Equal(t, int32(1), backgroundProcess.stopped.Load(), "failed to stop")
}()

log.Info("waiting for the custom background process to execute")
xclock.WaitUntil(func() bool {
return backgroundProcess.executed.Load() > 0
}, time.Minute)

assert.Equal(t, int32(1), backgroundProcess.executed.Load(), "failed to execute")
assert.True(t, backgroundProcess.reported.Load() > 0, "failed to report")
}
3 changes: 3 additions & 0 deletions src/dbnode/integration/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ func NewTestSetup(
storageOpts = storageOpts.SetDatabaseBlockOptions(blockOpts)
}

storageOpts = storageOpts.SetInstrumentOptions(
storageOpts.InstrumentOptions().SetReportInterval(opts.ReportInterval()))

// Set debugging options if environment vars set
if debugFilePrefix := os.Getenv("TEST_DEBUG_FILE_PREFIX"); debugFilePrefix != "" {
opts = opts.SetVerifySeriesDebugFilePathPrefix(debugFilePrefix)
Expand Down
1 change: 1 addition & 0 deletions src/dbnode/server/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ type StorageOptions struct {
OnColdFlush storage.OnColdFlush
ForceColdWritesEnabled bool
TChanNodeServerFn node.NewTChanNodeServerFn
BackgroundProcessFns []storage.NewBackgroundProcessFn
NamespaceHooks storage.NamespaceHooks
}
2 changes: 2 additions & 0 deletions src/dbnode/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,8 @@ func Run(runOpts RunOptions) {
opts = opts.SetOnColdFlush(runOpts.StorageOptions.OnColdFlush)
}

opts = opts.SetBackgroundProcessFns(append(opts.BackgroundProcessFns(), runOpts.StorageOptions.BackgroundProcessFns...))

if runOpts.StorageOptions.NamespaceHooks != nil {
opts = opts.SetNamespaceHooks(runOpts.StorageOptions.NamespaceHooks)
}
Expand Down
29 changes: 26 additions & 3 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type db struct {

state databaseState
mediator databaseMediator
repairer databaseRepairer

created uint64
bootstraps int
Expand Down Expand Up @@ -229,12 +230,34 @@ func NewDatabase(
zap.Error(err))
}

mediator, err := newMediator(
d.mediator, err = newMediator(
d, commitLog, opts.SetInstrumentOptions(databaseIOpts))
if err != nil {
return nil, err
}
d.mediator = mediator

d.repairer = newNoopDatabaseRepairer()
if opts.RepairEnabled() {
d.repairer, err = newDatabaseRepairer(d, opts)
if err != nil {
return nil, err
}
err = d.mediator.RegisterBackgroundProcess(d.repairer)
if err != nil {
return nil, err
}
}

for _, fn := range opts.BackgroundProcessFns() {
process, err := fn(d, opts)
if err != nil {
return nil, err
}
err = d.mediator.RegisterBackgroundProcess(process)
if err != nil {
return nil, err
}
}

return d, nil
}
Expand Down Expand Up @@ -1036,7 +1059,7 @@ func (d *db) IsBootstrappedAndDurable() bool {
}

func (d *db) Repair() error {
return d.mediator.Repair()
return d.repairer.Repair()
}

func (d *db) Truncate(namespace ident.ID) (int64, error) {
Expand Down
44 changes: 29 additions & 15 deletions src/dbnode/storage/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ type (
)

const (
fileOpCheckInterval = time.Second
fileSystemProcessesCheckInterval = 100 * time.Millisecond
fileOpCheckInterval = time.Second

mediatorNotOpen mediatorState = iota
mediatorOpen
Expand All @@ -51,7 +50,6 @@ var (
errMediatorAlreadyOpen = errors.New("mediator is already open")
errMediatorNotOpen = errors.New("mediator is not open")
errMediatorAlreadyClosed = errors.New("mediator is already closed")
errMediatorTimeBarrierAlreadyWaiting = errors.New("mediator time barrier already has a waiter")
errMediatorTimeTriedToProgressBackwards = errors.New("mediator time tried to progress backwards")
)

Expand All @@ -78,7 +76,6 @@ type mediator struct {
databaseFileSystemManager
databaseColdFlushManager
databaseTickManager
databaseRepairer

opts Options
nowFn clock.NowFn
Expand All @@ -88,6 +85,7 @@ type mediator struct {
mediatorTimeBarrier mediatorTimeBarrier
closedCh chan struct{}
tickInterval time.Duration
backgroundProcesses []BackgroundProcess
}

// TODO(r): Consider renaming "databaseMediator" to "databaseCoordinator"
Expand Down Expand Up @@ -123,31 +121,40 @@ func newMediator(database database, commitlog commitlog.CommitLog, opts Options)
cfm := newColdFlushManager(database, pm, opts)
d.databaseColdFlushManager = cfm

d.databaseRepairer = newNoopDatabaseRepairer()
if opts.RepairEnabled() {
d.databaseRepairer, err = newDatabaseRepairer(database, opts)
if err != nil {
return nil, err
}
}

d.databaseTickManager = newTickManager(database, opts)
d.databaseBootstrapManager = newBootstrapManager(database, d, opts)
return d, nil
}

func (m *mediator) RegisterBackgroundProcess(process BackgroundProcess) error {
m.Lock()
defer m.Unlock()

if m.state != mediatorNotOpen {
return errMediatorAlreadyOpen
}

m.backgroundProcesses = append(m.backgroundProcesses, process)
return nil
}

func (m *mediator) Open() error {
m.Lock()
defer m.Unlock()
if m.state != mediatorNotOpen {
return errMediatorAlreadyOpen
}
m.state = mediatorOpen

go m.reportLoop()
go m.ongoingFileSystemProcesses()
go m.ongoingColdFlushProcesses()
go m.ongoingTick()
m.databaseRepairer.Start()

for _, process := range m.backgroundProcesses {
process.Start()
}

return nil
}

Expand Down Expand Up @@ -175,9 +182,12 @@ func (m *mediator) EnableFileOps() {

func (m *mediator) Report() {
m.databaseBootstrapManager.Report()
m.databaseRepairer.Report()
m.databaseFileSystemManager.Report()
m.databaseColdFlushManager.Report()

for _, process := range m.backgroundProcesses {
process.Report()
}
}

func (m *mediator) Close() error {
Expand All @@ -191,7 +201,11 @@ func (m *mediator) Close() error {
}
m.state = mediatorClosed
close(m.closedCh)
m.databaseRepairer.Stop()

for _, process := range m.backgroundProcesses {
process.Stop()
}

return nil
}

Expand Down
Loading

0 comments on commit 5bcb1ba

Please sign in to comment.