Skip to content

Commit

Permalink
Additional metrics for peers bootstrapper (#3060)
Browse files Browse the repository at this point in the history
* refactored instrumentation code for peers bootstrapper.
added additional timers for peers bootstrapper to track individual steps.

* fixed linter warnings

* removed dead code

* removed dead code

* changes after review

* added new gauge - bootstrap-retries

* moved some instrumentation methods to instrumentationContexts for better encapsulation and easier usage.

* forgot to set nowFn

* added methods for creating instrumentation contexts.

* changes after code review
  • Loading branch information
soundvibe authored Jan 14, 2021
1 parent 482c510 commit eb36d4b
Show file tree
Hide file tree
Showing 6 changed files with 416 additions and 151 deletions.
2 changes: 1 addition & 1 deletion src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3857,7 +3857,7 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD
return nil, nil, errEnqueueChIsClosed
}
c.sending++ // NB(r): This is decremented by calling the returned enqueue done function
c.enqueued += (numToEnqueue)
c.enqueued += numToEnqueue
c.Unlock()
return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil
}
Expand Down
109 changes: 31 additions & 78 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,7 @@ import (
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -70,42 +65,38 @@ const (

type bootstrapFn func() error

type bootstrapNamespace struct {
namespace databaseNamespace
shards []databaseShard
}

type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
opts Options
log *zap.Logger
bootstrapFn bootstrapFn
nowFn clock.NowFn
sleepFn sleepFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
status tally.Gauge
bootstrapDuration tally.Timer
durableStatus tally.Gauge
sleepFn sleepFn
nowFn clock.NowFn
lastBootstrapCompletionTime xtime.UnixNano
instrumentation *bootstrapInstrumentation
}

func newBootstrapManager(
database database,
mediator databaseMediator,
opts Options,
) databaseBootstrapManager {
scope := opts.InstrumentOptions().MetricsScope()
m := &bootstrapManager{
database: database,
mediator: mediator,
opts: opts,
log: opts.InstrumentOptions().Logger(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
processProvider: opts.BootstrapProcessProvider(),
status: scope.Gauge("bootstrapped"),
bootstrapDuration: scope.Timer("bootstrap-duration"),
durableStatus: scope.Gauge("bootstrapped-durable"),
database: database,
mediator: mediator,
processProvider: opts.BootstrapProcessProvider(),
sleepFn: time.Sleep,
nowFn: opts.ClockOptions().NowFn(),
instrumentation: newBootstrapInstrumentation(opts),
}
m.bootstrapFn = m.bootstrap
return m
Expand Down Expand Up @@ -176,9 +167,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// NB(r): Last bootstrap failed, since this could be due to transient
// failure we retry the bootstrap again. This is to avoid operators
// needing to manually intervene for cases where failures are transient.
m.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
zap.Int("numRetries", i+1))
m.instrumentation.bootstrapFailed(i + 1)
m.sleepFn(bootstrapRetryInterval)
continue
}
Expand All @@ -202,22 +191,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
}

func (m *bootstrapManager) Report() {
if m.IsBootstrapped() {
m.status.Update(1)
} else {
m.status.Update(0)
}

if m.database.IsBootstrappedAndDurable() {
m.durableStatus.Update(1)
} else {
m.durableStatus.Update(0)
}
}

type bootstrapNamespace struct {
namespace databaseNamespace
shards []databaseShard
m.instrumentation.setIsBootstrapped(m.IsBootstrapped())
m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable())
}

func (m *bootstrapManager) bootstrap() error {
Expand All @@ -236,25 +211,20 @@ func (m *bootstrapManager) bootstrap() error {
return err
}

instrCtx := m.instrumentation.bootstrapPreparing()

accmulators := make([]bootstrap.NamespaceDataAccumulator, 0, len(namespaces))
defer func() {
// Close all accumulators at bootstrap completion, only error
// it returns is if already closed, so this is a code bug if ever
// an error returned.
for _, accumulator := range accmulators {
if err := accumulator.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("could not close bootstrap data accumulator",
zap.Error(err))
})
instrCtx.emitAndLogInvariantViolation(err, "could not close bootstrap data accumulator")
}
}
}()

start := m.nowFn()
m.log.Info("bootstrap prepare")

var (
bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces))
prepareWg sync.WaitGroup
Expand Down Expand Up @@ -288,7 +258,7 @@ func (m *bootstrapManager) bootstrap() error {
prepareWg.Wait()

if err := prepareMultiErr.FinalError(); err != nil {
m.log.Error("bootstrap prepare failed", zap.Error(err))
m.instrumentation.bootstrapPrepareFailed(err)
return err
}

Expand Down Expand Up @@ -329,26 +299,17 @@ func (m *bootstrapManager) bootstrap() error {
})
}

logFields := []zapcore.Field{
zap.Int("numShards", len(uniqueShards)),
}
m.log.Info("bootstrap started", logFields...)

instrCtx.bootstrapStarted(len(uniqueShards))
// Run the bootstrap.
bootstrapResult, err := process.Run(ctx, start, targets)

bootstrapDuration := m.nowFn().Sub(start)
m.bootstrapDuration.Record(bootstrapDuration)
logFields = append(logFields,
zap.Duration("bootstrapDuration", bootstrapDuration))

bootstrapResult, err := process.Run(ctx, instrCtx.start, targets)
if err != nil {
m.log.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
instrCtx.bootstrapFailed(err)
return err
}

m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...)
instrCtx.bootstrapSucceeded()

instrCtx.bootstrapNamespacesStarted()
// Use a multi-error here because we want to at least bootstrap
// as many of the namespaces as possible.
multiErr := xerrors.NewMultiError()
Expand All @@ -358,29 +319,21 @@ func (m *bootstrapManager) bootstrap() error {
if !ok {
err := fmt.Errorf("missing namespace from bootstrap result: %v",
id.String())
i := m.opts.InstrumentOptions()
instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) {
l.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
})
instrCtx.emitAndLogInvariantViolation(err, "bootstrap failed")
return err
}

if err := namespace.Bootstrap(ctx, result); err != nil {
m.log.Info("bootstrap error", append(logFields, []zapcore.Field{
zap.String("namespace", id.String()),
zap.Error(err),
}...)...)
instrCtx.bootstrapNamespaceFailed(err, id)
multiErr = multiErr.Add(err)
}
}

if err := multiErr.FinalError(); err != nil {
m.log.Info("bootstrap namespaces failed",
append(logFields, zap.Error(err))...)
instrCtx.bootstrapNamespacesFailed(err)
return err
}

m.log.Info("bootstrap success", logFields...)
instrCtx.bootstrapNamespacesSucceeded()
return nil
}
1 change: 1 addition & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package peers implements peers bootstrapping.
package peers

import (
Expand Down
Loading

0 comments on commit eb36d4b

Please sign in to comment.