Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional metrics for peers bootstrapper #3060

Merged
merged 13 commits into from
Jan 14, 2021
2 changes: 1 addition & 1 deletion src/dbnode/client/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -3857,7 +3857,7 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD
return nil, nil, errEnqueueChIsClosed
}
c.sending++ // NB(r): This is decremented by calling the returned enqueue done function
c.enqueued += (numToEnqueue)
c.enqueued += numToEnqueue
c.Unlock()
return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil
}
Expand Down
110 changes: 33 additions & 77 deletions src/dbnode/storage/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ import (
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/instrument"
xtime "github.com/m3db/m3/src/x/time"

"github.com/uber-go/tally"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

var (
Expand Down Expand Up @@ -70,42 +66,38 @@ const (

type bootstrapFn func() error

type bootstrapNamespace struct {
namespace databaseNamespace
shards []databaseShard
}

type bootstrapManager struct {
sync.RWMutex

database database
mediator databaseMediator
opts Options
log *zap.Logger
bootstrapFn bootstrapFn
nowFn clock.NowFn
sleepFn sleepFn
processProvider bootstrap.ProcessProvider
state BootstrapState
hasPending bool
status tally.Gauge
bootstrapDuration tally.Timer
durableStatus tally.Gauge
sleepFn sleepFn
nowFn clock.NowFn
lastBootstrapCompletionTime xtime.UnixNano
instrumentation *bootstrapInstrumentation
}

func newBootstrapManager(
database database,
mediator databaseMediator,
opts Options,
) databaseBootstrapManager {
scope := opts.InstrumentOptions().MetricsScope()
m := &bootstrapManager{
database: database,
mediator: mediator,
opts: opts,
log: opts.InstrumentOptions().Logger(),
nowFn: opts.ClockOptions().NowFn(),
sleepFn: time.Sleep,
processProvider: opts.BootstrapProcessProvider(),
status: scope.Gauge("bootstrapped"),
bootstrapDuration: scope.Timer("bootstrap-duration"),
durableStatus: scope.Gauge("bootstrapped-durable"),
database: database,
mediator: mediator,
processProvider: opts.BootstrapProcessProvider(),
sleepFn: time.Sleep,
nowFn: opts.ClockOptions().NowFn(),
instrumentation: newBootstrapInstrumentation(opts),
}
m.bootstrapFn = m.bootstrap
return m
Expand Down Expand Up @@ -176,9 +168,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
// NB(r): Last bootstrap failed, since this could be due to transient
// failure we retry the bootstrap again. This is to avoid operators
// needing to manually intervene for cases where failures are transient.
m.log.Warn("retrying bootstrap after backoff",
zap.Duration("backoff", bootstrapRetryInterval),
zap.Int("numRetries", i+1))
m.instrumentation.bootstrapFnFailed(i + 1)
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
m.sleepFn(bootstrapRetryInterval)
continue
}
Expand All @@ -202,22 +192,8 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) {
}

func (m *bootstrapManager) Report() {
if m.IsBootstrapped() {
m.status.Update(1)
} else {
m.status.Update(0)
}

if m.database.IsBootstrappedAndDurable() {
m.durableStatus.Update(1)
} else {
m.durableStatus.Update(0)
}
}

type bootstrapNamespace struct {
namespace databaseNamespace
shards []databaseShard
m.instrumentation.setIsBootstrapped(m.IsBootstrapped())
m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable())
}

func (m *bootstrapManager) bootstrap() error {
Expand All @@ -236,25 +212,21 @@ func (m *bootstrapManager) bootstrap() error {
return err
}

instrCtx := m.instrumentation.bootstrapPreparing()

accmulators := make([]bootstrap.NamespaceDataAccumulator, 0, len(namespaces))
defer func() {
// Close all accumulators at bootstrap completion, only error
// it returns is if already closed, so this is a code bug if ever
// an error returned.
for _, accumulator := range accmulators {
if err := accumulator.Close(); err != nil {
instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(),
func(l *zap.Logger) {
l.Error("could not close bootstrap data accumulator",
zap.Error(err))
})
instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions,
instrCtx.logFn(err, "could not close bootstrap data accumulator"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps better to just add an emitAndLogInvariantViolation method to the instrumentation context rather than pass it a function that takes a log (and remove the logFn implementation)?

}
}
}()

start := m.nowFn()
m.log.Info("bootstrap prepare")

var (
bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces))
prepareWg sync.WaitGroup
Expand Down Expand Up @@ -288,7 +260,7 @@ func (m *bootstrapManager) bootstrap() error {
prepareWg.Wait()

if err := prepareMultiErr.FinalError(); err != nil {
m.log.Error("bootstrap prepare failed", zap.Error(err))
m.instrumentation.bootstrapPrepareFailed(err)
return err
}

Expand Down Expand Up @@ -329,26 +301,17 @@ func (m *bootstrapManager) bootstrap() error {
})
}

logFields := []zapcore.Field{
zap.Int("numShards", len(uniqueShards)),
}
m.log.Info("bootstrap started", logFields...)

instrCtx.bootstrapStarted(len(uniqueShards))
// Run the bootstrap.
bootstrapResult, err := process.Run(ctx, start, targets)

bootstrapDuration := m.nowFn().Sub(start)
m.bootstrapDuration.Record(bootstrapDuration)
logFields = append(logFields,
zap.Duration("bootstrapDuration", bootstrapDuration))

bootstrapResult, err := process.Run(ctx, instrCtx.start, targets)
if err != nil {
m.log.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
instrCtx.bootstrapFailed(err)
return err
}

m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...)
instrCtx.bootstrapSucceeded()

instrCtx.bootstrapNamespacesStarted()
// Use a multi-error here because we want to at least bootstrap
// as many of the namespaces as possible.
multiErr := xerrors.NewMultiError()
Expand All @@ -358,29 +321,22 @@ func (m *bootstrapManager) bootstrap() error {
if !ok {
err := fmt.Errorf("missing namespace from bootstrap result: %v",
id.String())
i := m.opts.InstrumentOptions()
instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) {
l.Error("bootstrap failed",
append(logFields, zap.Error(err))...)
})
instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions,
instrCtx.logFn(err, "bootstrap failed"))
return err
}

if err := namespace.Bootstrap(ctx, result); err != nil {
m.log.Info("bootstrap error", append(logFields, []zapcore.Field{
zap.String("namespace", id.String()),
zap.Error(err),
}...)...)
instrCtx.bootstrapNamespaceFailed(err, id)
multiErr = multiErr.Add(err)
}
}

if err := multiErr.FinalError(); err != nil {
m.log.Info("bootstrap namespaces failed",
append(logFields, zap.Error(err))...)
instrCtx.bootstrapNamespacesFailed(err)
return err
}

m.log.Info("bootstrap success", logFields...)
instrCtx.bootstrapNamespacesSucceeded()
return nil
}
1 change: 1 addition & 0 deletions src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// Package peers implements peers bootstrapping.
soundvibe marked this conversation as resolved.
Show resolved Hide resolved
package peers

import (
Expand Down
Loading