From a548480190606cafe675ea904cb1dac5db25a1cd Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 4 Jan 2021 16:40:46 +0200 Subject: [PATCH 01/10] refactored instrumentation code for peers bootstrapper. added additional timers for peers bootstrapper to track individual steps. --- src/dbnode/storage/bootstrap.go | 223 ++++++++---- .../bootstrap/bootstrapper/peers/source.go | 342 +++++++++++++----- src/dbnode/storage/bootstrap_test.go | 2 +- 3 files changed, 394 insertions(+), 173 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index a154e09493..59cba42206 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -70,23 +70,142 @@ const ( type bootstrapFn func() error -type bootstrapManager struct { - sync.RWMutex - - database database - mediator databaseMediator +type instrumentation struct { opts Options log *zap.Logger - bootstrapFn bootstrapFn nowFn clock.NowFn sleepFn sleepFn - processProvider bootstrap.ProcessProvider - state BootstrapState - hasPending bool status tally.Gauge bootstrapDuration tally.Timer + bootstrapNamespacesDuration tally.Timer durableStatus tally.Gauge lastBootstrapCompletionTime xtime.UnixNano + start time.Time + startNamespaces time.Time + logFields []zapcore.Field +} + +func (i *instrumentation) bootstrapFnFailed(retry int) { + i.log.Warn("retrying bootstrap after backoff", + zap.Duration("backoff", bootstrapRetryInterval), + zap.Int("numRetries", retry+1)) + i.sleepFn(bootstrapRetryInterval) +} + +func (i *instrumentation) bootstrapPreparing() { + i.start = i.nowFn() + i.log.Info("bootstrap prepare") +} + +func (i *instrumentation) bootstrapPrepareFailed(err error) { + i.log.Error("bootstrap prepare failed", zap.Error(err)) +} + +func (i *instrumentation) bootstrapStarted(shards int) { + i.logFields = []zapcore.Field{ + zap.Int("numShards", shards), + } + i.log.Info("bootstrap started", i.logFields...) +} + +func (i *instrumentation) bootstrapSucceeded() { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Info("bootstrap succeeded, marking namespaces complete", i.logFields...) +} + +func (i *instrumentation) bootstrapFailed(err error) { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) +} + +func (i *instrumentation) bootstrapNamespaceFailed(err error, namespaceId string) { + i.log.Info("bootstrap namespace error", append(i.logFields, []zapcore.Field{ + zap.String("namespace", namespaceId), + zap.Error(err), + }...)...) +} + +func (i *instrumentation) bootstrapNamespacesFailed(err error) { + duration := i.nowFn().Sub(i.startNamespaces) + i.bootstrapNamespacesDuration.Record(duration) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...) +} + +func (i *instrumentation) bootstrapNamespacesStarted() { + i.startNamespaces = i.nowFn() + i.log.Info("bootstrap namespaces start", i.logFields...) +} + +func (i *instrumentation) bootstrapNamespacesSucceeded() { + duration := i.nowFn().Sub(i.startNamespaces) + i.bootstrapNamespacesDuration.Record(duration) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces success", i.logFields...) +} + +func (i *instrumentation) bootstrapCompletion() { + i.lastBootstrapCompletionTime = xtime.ToUnixNano(i.nowFn()) +} + +func (i *instrumentation) setIsBootstrapped(isBootstrapped bool) { + if isBootstrapped { + i.status.Update(1) + } else { + i.status.Update(0) + } +} + +func (i *instrumentation) setIsBootstrappedAndDurable(isBootstrappedAndDurable bool) { + if isBootstrappedAndDurable { + i.durableStatus.Update(1) + } else { + i.durableStatus.Update(0) + } +} + +func (i *instrumentation) missingNamespaceFromResult(err error) { + instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) { + l.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) + }) +} + +func (i *instrumentation) bootstrapDataAccumulatorCloseFailed(err error) { + instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("could not close bootstrap data accumulator", + zap.Error(err)) + }) +} + +func newInstrumentation(opts Options) *instrumentation { + scope := opts.InstrumentOptions().MetricsScope() + return &instrumentation{ + opts: opts, + log: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), + sleepFn: time.Sleep, + status: scope.Gauge("bootstrapped"), + bootstrapDuration: scope.Timer("bootstrap-duration"), + bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), + durableStatus: scope.Gauge("bootstrapped-durable"), + } +} + +type bootstrapManager struct { + sync.RWMutex + + database database + mediator databaseMediator + bootstrapFn bootstrapFn + processProvider bootstrap.ProcessProvider + state BootstrapState + hasPending bool + instrumentation *instrumentation } func newBootstrapManager( @@ -94,18 +213,11 @@ func newBootstrapManager( mediator databaseMediator, opts Options, ) databaseBootstrapManager { - scope := opts.InstrumentOptions().MetricsScope() m := &bootstrapManager{ - database: database, - mediator: mediator, - opts: opts, - log: opts.InstrumentOptions().Logger(), - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - processProvider: opts.BootstrapProcessProvider(), - status: scope.Gauge("bootstrapped"), - bootstrapDuration: scope.Timer("bootstrap-duration"), - durableStatus: scope.Gauge("bootstrapped-durable"), + database: database, + mediator: mediator, + processProvider: opts.BootstrapProcessProvider(), + instrumentation: newInstrumentation(opts), } m.bootstrapFn = m.bootstrap return m @@ -120,7 +232,7 @@ func (m *bootstrapManager) IsBootstrapped() bool { func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) { m.RLock() - bsTime := m.lastBootstrapCompletionTime + bsTime := m.instrumentation.lastBootstrapCompletionTime m.RUnlock() return bsTime, bsTime > 0 } @@ -176,10 +288,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // NB(r): Last bootstrap failed, since this could be due to transient // failure we retry the bootstrap again. This is to avoid operators // needing to manually intervene for cases where failures are transient. - m.log.Warn("retrying bootstrap after backoff", - zap.Duration("backoff", bootstrapRetryInterval), - zap.Int("numRetries", i+1)) - m.sleepFn(bootstrapRetryInterval) + m.instrumentation.bootstrapFnFailed(i + 1) continue } @@ -196,23 +305,14 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // on its own course so that the load of ticking and flushing is more spread out // across the cluster. m.Lock() - m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn()) + m.instrumentation.bootstrapCompletion() m.Unlock() return result, nil } func (m *bootstrapManager) Report() { - if m.IsBootstrapped() { - m.status.Update(1) - } else { - m.status.Update(0) - } - - if m.database.IsBootstrappedAndDurable() { - m.durableStatus.Update(1) - } else { - m.durableStatus.Update(0) - } + m.instrumentation.setIsBootstrapped(m.IsBootstrapped()) + m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable()) } type bootstrapNamespace struct { @@ -243,18 +343,12 @@ func (m *bootstrapManager) bootstrap() error { // an error returned. for _, accumulator := range accmulators { if err := accumulator.Close(); err != nil { - instrument.EmitAndLogInvariantViolation(m.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("could not close bootstrap data accumulator", - zap.Error(err)) - }) + m.instrumentation.bootstrapDataAccumulatorCloseFailed(err) } } }() - start := m.nowFn() - m.log.Info("bootstrap prepare") - + m.instrumentation.bootstrapPreparing() var ( bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces)) prepareWg sync.WaitGroup @@ -288,7 +382,7 @@ func (m *bootstrapManager) bootstrap() error { prepareWg.Wait() if err := prepareMultiErr.FinalError(); err != nil { - m.log.Error("bootstrap prepare failed", zap.Error(err)) + m.instrumentation.bootstrapPrepareFailed(err) return err } @@ -329,28 +423,19 @@ func (m *bootstrapManager) bootstrap() error { }) } - logFields := []zapcore.Field{ - zap.Int("numShards", len(uniqueShards)), - } - m.log.Info("bootstrap started", logFields...) - + m.instrumentation.bootstrapStarted(len(uniqueShards)) // Run the bootstrap. - bootstrapResult, err := process.Run(ctx, start, targets) - - bootstrapDuration := m.nowFn().Sub(start) - m.bootstrapDuration.Record(bootstrapDuration) - logFields = append(logFields, - zap.Duration("bootstrapDuration", bootstrapDuration)) - + bootstrapResult, err := process.Run(ctx, m.instrumentation.start, targets) if err != nil { - m.log.Error("bootstrap failed", - append(logFields, zap.Error(err))...) + m.instrumentation.bootstrapFailed(err) return err } - m.log.Info("bootstrap succeeded, marking namespaces complete", logFields...) + m.instrumentation.bootstrapSucceeded() // Use a multi-error here because we want to at least bootstrap // as many of the namespaces as possible. + + m.instrumentation.bootstrapNamespacesStarted() multiErr := xerrors.NewMultiError() for _, namespace := range namespaces { id := namespace.ID() @@ -358,29 +443,21 @@ func (m *bootstrapManager) bootstrap() error { if !ok { err := fmt.Errorf("missing namespace from bootstrap result: %v", id.String()) - i := m.opts.InstrumentOptions() - instrument.EmitAndLogInvariantViolation(i, func(l *zap.Logger) { - l.Error("bootstrap failed", - append(logFields, zap.Error(err))...) - }) + m.instrumentation.missingNamespaceFromResult(err) return err } if err := namespace.Bootstrap(ctx, result); err != nil { - m.log.Info("bootstrap error", append(logFields, []zapcore.Field{ - zap.String("namespace", id.String()), - zap.Error(err), - }...)...) + m.instrumentation.bootstrapNamespaceFailed(err, id.String()) multiErr = multiErr.Add(err) } } if err := multiErr.FinalError(); err != nil { - m.log.Info("bootstrap namespaces failed", - append(logFields, zap.Error(err))...) + m.instrumentation.bootstrapNamespacesFailed(err) return err } - m.log.Info("bootstrap success", logFields...) + m.instrumentation.bootstrapNamespacesSucceeded() return nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index a0bf66d87b..f5bc15c9b6 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -59,15 +59,12 @@ import ( ) type peersSource struct { - opts Options - log *zap.Logger + opts Options + // log *zap.Logger newPersistManager func() (persist.Manager, error) - nowFn clock.NowFn - metrics peersSourceMetrics -} - -type peersSourceMetrics struct { - persistedIndexBlocksOutOfRetention tally.Counter + // nowFn clock.NowFn + // metrics peersSourceMetrics + instrumentation *instrumentation } type persistenceFlush struct { @@ -77,24 +74,209 @@ type persistenceFlush struct { timeRange xtime.Range } +type instrumentation struct { + opts Options + log *zap.Logger + nowFn clock.NowFn + bootstrapDataDuration tally.Timer + bootstrapIndexDuration tally.Timer + bootstrapShardsDuration tally.Timer + persistedIndexBlocksOutOfRetention tally.Counter + start time.Time + startShards time.Time + logFields []zapcore.Field +} + +func (i *instrumentation) bootstrapDataStarted(span opentracing.Span) { + i.start = i.nowFn() + i.log.Info("bootstrapping time series data start") + span.LogEvent("bootstrap_data_start") +} + +func (i *instrumentation) bootstrapDataCompleted(span opentracing.Span) { + duration := i.nowFn().Sub(i.start) + i.bootstrapDataDuration.Record(duration) + i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) + span.LogEvent("bootstrap_data_done") +} + +func (i *instrumentation) bootstrapIndexStarted(span opentracing.Span) { + i.start = i.nowFn() + i.log.Info("bootstrapping index metadata start") + span.LogEvent("bootstrap_index_start") +} + +func (i *instrumentation) bootstrapIndexCompleted(span opentracing.Span) { + duration := i.nowFn().Sub(i.start) + i.bootstrapIndexDuration.Record(duration) + i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) + span.LogEvent("bootstrap_index_done") +} + +func (i *instrumentation) bootstrapIndexSkipped(namespaceId ident.ID) { + i.log.Info("skipping bootstrap for namespace based on options", + zap.Stringer("namespace", namespaceId)) +} + +func (i *instrumentation) getDefaultAdminSessionFailed(err error) { + i.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) +} + +func (i *instrumentation) bootstrapShardsStarted(count int, concurrency int, shouldPersist bool) { + i.startShards = i.nowFn() + i.log.Info("peers bootstrapper bootstrapping shards for ranges", + zap.Int("shards", count), + zap.Int("concurrency", concurrency), + zap.Bool("shouldPersist", shouldPersist)) +} + +func (i *instrumentation) bootstrapShardsCompleted() { + duration := i.nowFn().Sub(i.startShards) + i.bootstrapShardsDuration.Record(duration) + i.log.Info("bootstrapping shards success", zap.Duration("took", duration)) +} + +func (i *instrumentation) persistenceFlushFailed(err error) { + i.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", + zap.Error(err)) +} + +func (i *instrumentation) seriesCheckoutFailed(err error) { + i.log.Error("could not checkout series", zap.Error(err)) +} + +func (i *instrumentation) seriesLoadFailed(err error) { + i.log.Error("could not load series block", zap.Error(err)) +} + +func (i *instrumentation) shardBootstrapped(shard uint32, numSeries int64, blockTime time.Time) { + i.log.Info("peer bootstrapped shard", + zap.Uint32("shard", shard), + zap.Int64("numSeries", numSeries), + zap.Time("blockStart", blockTime), + ) +} + +func (i *instrumentation) fetchBootstrapBlocksFailed(err error, shard uint32) { + i.log.Error("error fetching bootstrap blocks", + zap.Uint32("shard", shard), + zap.Error(err), + ) +} + +func (i *instrumentation) peersBootstrapperIndexForRanges(count int) { + i.log.Info("peers bootstrapper bootstrapping index for ranges", + zap.Int("shards", count), + ) +} + +func (i *instrumentation) processingReadersFailed(err error, start time.Time) { + i.log.Error("error processing readers", zap.Error(err), + zap.Time("timeRange.start", start)) +} + +func (i *instrumentation) buildingFileSetIndexSegmentStarted(fields []zapcore.Field) { + i.log.Debug("building file set index segment", fields...) +} + +func (i *instrumentation) outOfRetentionIndexSegmentSkipped(fields []zapcore.Field) { + i.log.Debug("skipping out of retention index segment", fields...) + i.persistedIndexBlocksOutOfRetention.Inc(1) +} + +func (i *instrumentation) buildingInMemoryIndexSegmentStarted(fields []zapcore.Field) { + i.log.Info("building in-memory index segment", fields...) +} + +func (i *instrumentation) errorsForRangeEncountered(summaryString string, errorsString []string) { + i.log.Info("encountered errors for range", + zap.String("requestedRanges", summaryString), + zap.Strings("timesWithErrors", errorsString)) +} + +func (i *instrumentation) noPeersAvailable(total int, shardIDUint uint32) { + i.log.Debug( + "0 available peers, unable to peer bootstrap", + zap.Int("total", total), zap.Uint32("shard", shardIDUint)) +} + +func (i *instrumentation) readConsistencyNotAchieved( + bootstrapConsistencyLevel topology.ReadConsistencyLevel, + majorityReplicas int, + total int, + available int, +) { + i.log.Debug( + "read consistency not achieved, unable to peer bootstrap", + zap.Any("level", bootstrapConsistencyLevel), + zap.Int("replicas", majorityReplicas), + zap.Int("total", total), + zap.Int("available", available)) +} + +func (i *instrumentation) peersBootstrapperSourceReadStarted(ctx context.Context) ( + context.Context, opentracing.Span, bool) { + return ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) +} + +func (i *instrumentation) persistFsIndexBootstrapFailed(err error, + iopts instrument.Options, + id ident.ID, + ranges result.ShardTimeRanges) { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("persist fs index bootstrap failed", + zap.Stringer("namespace", id), + zap.Stringer("requestedRanges", ranges), + zap.Error(err)) + }) +} + +func (i *instrumentation) buildFsIndexBootstrapFailed(err error, + iopts instrument.Options, + id ident.ID, + ranges result.ShardTimeRanges) { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("build fs index bootstrap failed", + zap.Stringer("namespace", id), + zap.Stringer("requestedRanges", ranges), + zap.Error(err)) + }) +} + +func newInstrumentation(opts Options) *instrumentation { + instrumentOptions := opts.ResultOptions().InstrumentOptions() + scope := instrumentOptions.MetricsScope().SubScope("peers-bootstrapper") + instrumentOptions = instrumentOptions.SetMetricsScope(scope) + return &instrumentation{ + opts: opts, + log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), + nowFn: opts.ResultOptions().ClockOptions().NowFn(), + bootstrapDataDuration: scope.Timer("peer-bootstrap-data-duration"), + bootstrapIndexDuration: scope.Timer("peer-bootstrap-index-duration"), + bootstrapShardsDuration: scope.Timer("peer-bootstrap-shards-duration"), + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + } +} + func newPeersSource(opts Options) (bootstrap.Source, error) { if err := opts.Validate(); err != nil { return nil, err } - iopts := opts.ResultOptions().InstrumentOptions() - scope := iopts.MetricsScope().SubScope("peers-bootstrapper") - iopts = iopts.SetMetricsScope(scope) + // iopts := opts.ResultOptions().InstrumentOptions() + // scope := iopts.MetricsScope().SubScope("peers-bootstrapper") + // iopts = iopts.SetMetricsScope(scope) return &peersSource{ opts: opts, - log: iopts.Logger().With(zap.String("bootstrapper", "peers")), + // log: iopts.Logger().With(zap.String("bootstrapper", "peers")), newPersistManager: func() (persist.Manager, error) { return fs.NewPersistManager(opts.FilesystemOptions()) }, - nowFn: opts.ResultOptions().ClockOptions().NowFn(), - metrics: peersSourceMetrics{ - persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), - }, + /* nowFn: opts.ResultOptions().ClockOptions().NowFn(), + metrics: peersSourceMetrics{ + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + },*/ + instrumentation: newInstrumentation(opts), }, nil } @@ -132,7 +314,7 @@ func (s *peersSource) Read( namespaces bootstrap.Namespaces, cache bootstrap.Cache, ) (bootstrap.NamespaceResults, error) { - ctx, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) + ctx, span, _ := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) defer span.Finish() timeRangesEmpty := true @@ -157,10 +339,8 @@ func (s *peersSource) Read( } // NB(r): Perform all data bootstrapping first then index bootstrapping - // to more clearly deliniate which process is slower than the other. - start := s.nowFn() - s.log.Info("bootstrapping time series data start") - span.LogEvent("bootstrap_data_start") + // to more clearly delineate which process is slower than the other. + s.instrumentation.bootstrapDataStarted(span) for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -178,24 +358,17 @@ func (s *peersSource) Read( DataResult: r, }) } - s.log.Info("bootstrapping time series data success", - zap.Duration("took", s.nowFn().Sub(start))) - span.LogEvent("bootstrap_data_done") - + s.instrumentation.bootstrapDataCompleted(span) // NB(bodu): We need to evict the info file cache before reading index data since we've // maybe fetched blocks from peers so the cached info file state is now stale. cache.Evict() - start = s.nowFn() - s.log.Info("bootstrapping index metadata start") - span.LogEvent("bootstrap_index_start") + + s.instrumentation.bootstrapIndexStarted(span) for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata if !md.Options().IndexOptions().Enabled() { - s.log.Info("skipping bootstrap for namespace based on options", - zap.Stringer("namespace", md.ID())) - - // Not bootstrapping for index. + s.instrumentation.bootstrapIndexSkipped(md.ID()) continue } @@ -220,9 +393,7 @@ func (s *peersSource) Read( results.Results.Set(md.ID(), result) } - s.log.Info("bootstrapping index metadata success", - zap.Duration("took", s.nowFn().Sub(start))) - span.LogEvent("bootstrap_index_done") + s.instrumentation.bootstrapIndexCompleted(span) return results, nil } @@ -257,7 +428,7 @@ func (s *peersSource) readData( result := result.NewDataBootstrapResult() session, err := s.opts.AdminClient().DefaultAdminSession() if err != nil { - s.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) + s.instrumentation.getDefaultAdminSessionFailed(err) result.SetUnfulfilled(shardTimeRanges) return nil, err } @@ -277,10 +448,7 @@ func (s *peersSource) readData( concurrency = s.opts.ShardPersistenceConcurrency() } - s.log.Info("peers bootstrapper bootstrapping shards for ranges", - zap.Int("shards", count), - zap.Int("concurrency", concurrency), - zap.Bool("shouldPersist", shouldPersist)) + s.instrumentation.bootstrapShardsStarted(count, concurrency, shouldPersist) if shouldPersist { // Spin up persist workers. for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ { @@ -324,6 +492,7 @@ func (s *peersSource) readData( } } + s.instrumentation.bootstrapShardsCompleted() return result, nil } @@ -382,8 +551,7 @@ func (s *peersSource) runPersistenceQueueWorkerLoop( } // Remove results and make unfulfilled if an error occurred. - s.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", - zap.Error(err)) + s.instrumentation.persistenceFlushFailed(err) // Make unfulfilled. lock.Lock() @@ -461,14 +629,14 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } unfulfill(currRange) - s.log.Error("could not checkout series", zap.Error(err)) + s.instrumentation.seriesCheckoutFailed(err) continue } for _, block := range entry.Blocks.AllBlocks() { if err := ref.Series.LoadBlock(block, series.WarmWrite); err != nil { unfulfill(currRange) - s.log.Error("could not load series block", zap.Error(err)) + s.instrumentation.seriesLoadFailed(err) } } @@ -485,27 +653,21 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( shardResult result.ShardResult, err error, ) { - if err == nil { - shardBlockSeriesCounter := map[xtime.UnixNano]int64{} - for _, entry := range shardResult.AllSeries().Iter() { - series := entry.Value() - for blockStart := range series.Blocks.AllBlocks() { - shardBlockSeriesCounter[blockStart]++ - } - } + if err != nil { + s.instrumentation.fetchBootstrapBlocksFailed(err, shard) + return + } - for block, numSeries := range shardBlockSeriesCounter { - s.log.Info("peer bootstrapped shard", - zap.Uint32("shard", shard), - zap.Int64("numSeries", numSeries), - zap.Time("blockStart", block.ToTime()), - ) + shardBlockSeriesCounter := map[xtime.UnixNano]int64{} + for _, entry := range shardResult.AllSeries().Iter() { + series := entry.Value() + for blockStart := range series.Blocks.AllBlocks() { + shardBlockSeriesCounter[blockStart]++ } - } else { - s.log.Error("error fetching bootstrap blocks", - zap.Uint32("shard", shard), - zap.Error(err), - ) + } + + for block, numSeries := range shardBlockSeriesCounter { + s.instrumentation.shardBootstrapped(shard, numSeries, block.ToTime()) } } @@ -717,9 +879,7 @@ func (s *peersSource) readIndex( indexSegmentConcurrency = s.opts.IndexSegmentConcurrency() readersCh = make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency) ) - s.log.Info("peers bootstrapper bootstrapping index for ranges", - zap.Int("shards", count), - ) + s.instrumentation.peersBootstrapperIndexForRanges(count) go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{ NsMD: ns, @@ -733,9 +893,9 @@ func (s *peersSource) readIndex( // NB(bodu): We only read metadata when performing a peers bootstrap // so we do not need to sort the data fileset reader. OptimizedReadMetadataOnly: true, - Logger: s.log, + Logger: s.instrumentation.log, Span: span, - NowFn: s.nowFn, + NowFn: s.instrumentation.nowFn, Cache: cache, }) @@ -899,8 +1059,7 @@ func (s *peersSource) processReaders( xtime.NewRanges(timeRange), )) } else { - s.log.Error("error processing readers", zap.Error(err), - zap.Time("timeRange.start", start)) + s.instrumentation.processingReadersFailed(err, start) timesWithErrors = append(timesWithErrors, timeRange.Start) } } @@ -944,7 +1103,7 @@ func (s *peersSource) processReaders( zap.String("remainingRanges", remainingRanges.SummaryString()), } if shouldPersist { - s.log.Debug("building file set index segment", buildIndexLogFields...) + s.instrumentation.buildingFileSetIndexSegmentStarted(buildIndexLogFields) indexBlock, err = bootstrapper.PersistBootstrapIndexSegment( ns, requestedRanges, @@ -960,19 +1119,13 @@ func (s *peersSource) processReaders( // Bail early if the index segment is already out of retention. // This can happen when the edge of requested ranges at time of data bootstrap // is now out of retention. - s.log.Debug("skipping out of retention index segment", buildIndexLogFields...) - s.metrics.persistedIndexBlocksOutOfRetention.Inc(1) + s.instrumentation.outOfRetentionIndexSegmentSkipped(buildIndexLogFields) return remainingRanges, timesWithErrors } else if err != nil { - instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { - l.Error("persist fs index bootstrap failed", - zap.Stringer("namespace", ns.ID()), - zap.Stringer("requestedRanges", requestedRanges), - zap.Error(err)) - }) + s.instrumentation.persistFsIndexBootstrapFailed(err, iopts, ns.ID(), requestedRanges) } } else { - s.log.Info("building in-memory index segment", buildIndexLogFields...) + s.instrumentation.buildingInMemoryIndexSegmentStarted(buildIndexLogFields) indexBlock, err = bootstrapper.BuildBootstrapIndexSegment( ns, requestedRanges, @@ -984,13 +1137,7 @@ func (s *peersSource) processReaders( blockEnd, ) if err != nil { - iopts := s.opts.ResultOptions().InstrumentOptions() - instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { - l.Error("build fs index bootstrap failed", - zap.Stringer("namespace", ns.ID()), - zap.Stringer("requestedRanges", requestedRanges), - zap.Error(err)) - }) + s.instrumentation.buildFsIndexBootstrapFailed(err, iopts, ns.ID(), requestedRanges) } } @@ -1060,9 +1207,9 @@ func (s *peersSource) markRunResultErrorsAndUnfulfilled( for i := range timesWithErrors { timesWithErrorsString[i] = timesWithErrors[i].String() } - s.log.Info("encounted errors for range", - zap.String("requestedRanges", requestedRanges.SummaryString()), - zap.Strings("timesWithErrors", timesWithErrorsString)) + s.instrumentation.errorsForRangeEncountered( + remainingRanges.SummaryString(), + timesWithErrorsString) } if !remainingRanges.IsEmpty() { @@ -1140,20 +1287,17 @@ func (s *peersSource) peerAvailability( if available == 0 { // Can't peer bootstrap if there are no available peers. - s.log.Debug( - "0 available peers, unable to peer bootstrap", - zap.Int("total", total), zap.Uint32("shard", shardIDUint)) + s.instrumentation.noPeersAvailable(total, shardIDUint) continue } if !topology.ReadConsistencyAchieved( bootstrapConsistencyLevel, majorityReplicas, total, available) { - s.log.Debug( - "read consistency not achieved, unable to peer bootstrap", - zap.Any("level", bootstrapConsistencyLevel), - zap.Int("replicas", majorityReplicas), - zap.Int("total", total), - zap.Int("available", available)) + s.instrumentation.readConsistencyNotAchieved( + bootstrapConsistencyLevel, + majorityReplicas, + total, + available) continue } diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 95337e996f..1faebed605 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -63,7 +63,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { bsm := newBootstrapManager(db, m, opts).(*bootstrapManager) // Don't sleep. - bsm.sleepFn = func(time.Duration) {} + bsm.instrumentation.sleepFn = func(time.Duration) {} gomock.InOrder( ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil), From 75363145e44b9bb7a7cc161aee120d634f0e4309 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Tue, 5 Jan 2021 10:42:48 +0200 Subject: [PATCH 02/10] fixed linter warnings --- src/dbnode/storage/bootstrap.go | 4 +- .../bootstrap/bootstrapper/peers/source.go | 48 +++++++++---------- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 59cba42206..bd3729bafe 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -122,9 +122,9 @@ func (i *instrumentation) bootstrapFailed(err error) { i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) } -func (i *instrumentation) bootstrapNamespaceFailed(err error, namespaceId string) { +func (i *instrumentation) bootstrapNamespaceFailed(err error, namespaceID string) { i.log.Info("bootstrap namespace error", append(i.logFields, []zapcore.Field{ - zap.String("namespace", namespaceId), + zap.String("namespace", namespaceID), zap.Error(err), }...)...) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index bb433278a7..2bc8e0286e 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -28,6 +28,7 @@ import ( "time" "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -84,38 +85,52 @@ type instrumentation struct { persistedIndexBlocksOutOfRetention tally.Counter start time.Time startShards time.Time - logFields []zapcore.Field +} + +func newInstrumentation(opts Options) *instrumentation { + instrumentOptions := opts.ResultOptions().InstrumentOptions() + scope := instrumentOptions.MetricsScope().SubScope("peers-bootstrapper") + instrumentOptions = instrumentOptions.SetMetricsScope(scope) + return &instrumentation{ + opts: opts, + log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), + nowFn: opts.ResultOptions().ClockOptions().NowFn(), + bootstrapDataDuration: scope.Timer("peer-bootstrap-data-duration"), + bootstrapIndexDuration: scope.Timer("peer-bootstrap-index-duration"), + bootstrapShardsDuration: scope.Timer("peer-bootstrap-shards-duration"), + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + } } func (i *instrumentation) bootstrapDataStarted(span opentracing.Span) { i.start = i.nowFn() i.log.Info("bootstrapping time series data start") - span.LogEvent("bootstrap_data_start") + span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) } func (i *instrumentation) bootstrapDataCompleted(span opentracing.Span) { duration := i.nowFn().Sub(i.start) i.bootstrapDataDuration.Record(duration) i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) - span.LogEvent("bootstrap_data_done") + span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) } func (i *instrumentation) bootstrapIndexStarted(span opentracing.Span) { i.start = i.nowFn() i.log.Info("bootstrapping index metadata start") - span.LogEvent("bootstrap_index_start") + span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) } func (i *instrumentation) bootstrapIndexCompleted(span opentracing.Span) { duration := i.nowFn().Sub(i.start) i.bootstrapIndexDuration.Record(duration) i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) - span.LogEvent("bootstrap_index_done") + span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) } -func (i *instrumentation) bootstrapIndexSkipped(namespaceId ident.ID) { +func (i *instrumentation) bootstrapIndexSkipped(namespaceID ident.ID) { i.log.Info("skipping bootstrap for namespace based on options", - zap.Stringer("namespace", namespaceId)) + zap.Stringer("namespace", namespaceID)) } func (i *instrumentation) getDefaultAdminSessionFailed(err error) { @@ -243,21 +258,6 @@ func (i *instrumentation) buildFsIndexBootstrapFailed(err error, }) } -func newInstrumentation(opts Options) *instrumentation { - instrumentOptions := opts.ResultOptions().InstrumentOptions() - scope := instrumentOptions.MetricsScope().SubScope("peers-bootstrapper") - instrumentOptions = instrumentOptions.SetMetricsScope(scope) - return &instrumentation{ - opts: opts, - log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), - nowFn: opts.ResultOptions().ClockOptions().NowFn(), - bootstrapDataDuration: scope.Timer("peer-bootstrap-data-duration"), - bootstrapIndexDuration: scope.Timer("peer-bootstrap-index-duration"), - bootstrapShardsDuration: scope.Timer("peer-bootstrap-shards-duration"), - persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), - } -} - func newPeersSource(opts Options) (bootstrap.Source, error) { if err := opts.Validate(); err != nil { return nil, err @@ -314,7 +314,7 @@ func (s *peersSource) Read( namespaces bootstrap.Namespaces, cache bootstrap.Cache, ) (bootstrap.NamespaceResults, error) { - ctx, span, _ := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) + _, span, _ := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) defer span.Finish() timeRangesEmpty := true @@ -659,7 +659,7 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( } shardBlockSeriesCounter := map[xtime.UnixNano]int64{} - for _, entry := range shardResult.AllSeries().Iter() { + for _, entry := range shardResult.AllSeries().Iter() { // nolint series := entry.Value() for blockStart := range series.Blocks.AllBlocks() { shardBlockSeriesCounter[blockStart]++ From 0599c9c7422644f753c23f6a75a7f176f50996da Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 6 Jan 2021 18:40:07 +0200 Subject: [PATCH 03/10] removed dead code --- src/dbnode/storage/bootstrap/bootstrapper/peers/source.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 2bc8e0286e..5eaf2ed7f6 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -263,19 +263,11 @@ func newPeersSource(opts Options) (bootstrap.Source, error) { return nil, err } - // iopts := opts.ResultOptions().InstrumentOptions() - // scope := iopts.MetricsScope().SubScope("peers-bootstrapper") - // iopts = iopts.SetMetricsScope(scope) return &peersSource{ opts: opts, - // log: iopts.Logger().With(zap.String("bootstrapper", "peers")), newPersistManager: func() (persist.Manager, error) { return fs.NewPersistManager(opts.FilesystemOptions()) }, - /* nowFn: opts.ResultOptions().ClockOptions().NowFn(), - metrics: peersSourceMetrics{ - persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), - },*/ instrumentation: newInstrumentation(opts), }, nil } From 018dc1dae72afa4c7ad67607964904eaef33f369 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 6 Jan 2021 18:48:22 +0200 Subject: [PATCH 04/10] removed dead code --- src/dbnode/storage/bootstrap/bootstrapper/peers/source.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 5eaf2ed7f6..bef2d987ec 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -61,10 +61,7 @@ import ( type peersSource struct { opts Options - // log *zap.Logger newPersistManager func() (persist.Manager, error) - // nowFn clock.NowFn - // metrics peersSourceMetrics instrumentation *instrumentation } From ff60333a9bf15255a6afbdbae9c3041e04a2771a Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 7 Jan 2021 16:30:23 +0200 Subject: [PATCH 05/10] changes after review --- src/dbnode/client/session.go | 4 +- src/dbnode/storage/bootstrap.go | 190 +++------------ .../bootstrap/bootstrapper/peers/peers.go | 2 +- .../bootstrap/bootstrapper/peers/source.go | 221 ++---------------- .../peers/source_instrumentation.go | 208 +++++++++++++++++ .../storage/bootstrap_instrumentation.go | 145 ++++++++++++ src/dbnode/storage/bootstrap_test.go | 2 +- 7 files changed, 417 insertions(+), 355 deletions(-) create mode 100644 src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go create mode 100644 src/dbnode/storage/bootstrap_instrumentation.go diff --git a/src/dbnode/client/session.go b/src/dbnode/client/session.go index e1de530ebe..7b421ebb34 100644 --- a/src/dbnode/client/session.go +++ b/src/dbnode/client/session.go @@ -937,7 +937,7 @@ func (s *session) setTopologyWithLock(topoMap topology.Map, queues []hostQueue, if s.pools.multiReaderIteratorArray == nil { s.pools.multiReaderIteratorArray = encoding.NewMultiReaderIteratorArrayPool([]pool.Bucket{ - pool.Bucket{ + { Capacity: replicas, Count: s.opts.SeriesIteratorPoolSize(), }, @@ -3853,7 +3853,7 @@ func (c *enqueueCh) enqueueDelayed(numToEnqueue int) (enqueueDelayedFn, enqueueD return nil, nil, errEnqueueChIsClosed } c.sending++ // NB(r): This is decremented by calling the returned enqueue done function - c.enqueued += (numToEnqueue) + c.enqueued += numToEnqueue c.Unlock() return c.enqueueDelayedFn, c.enqueueDelayedDoneFn, nil } diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index bd3729bafe..4c9dc1c716 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -33,9 +33,7 @@ import ( "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" - "github.com/uber-go/tally" "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) var ( @@ -70,142 +68,24 @@ const ( type bootstrapFn func() error -type instrumentation struct { - opts Options - log *zap.Logger - nowFn clock.NowFn - sleepFn sleepFn - status tally.Gauge - bootstrapDuration tally.Timer - bootstrapNamespacesDuration tally.Timer - durableStatus tally.Gauge - lastBootstrapCompletionTime xtime.UnixNano - start time.Time - startNamespaces time.Time - logFields []zapcore.Field -} - -func (i *instrumentation) bootstrapFnFailed(retry int) { - i.log.Warn("retrying bootstrap after backoff", - zap.Duration("backoff", bootstrapRetryInterval), - zap.Int("numRetries", retry+1)) - i.sleepFn(bootstrapRetryInterval) -} - -func (i *instrumentation) bootstrapPreparing() { - i.start = i.nowFn() - i.log.Info("bootstrap prepare") -} - -func (i *instrumentation) bootstrapPrepareFailed(err error) { - i.log.Error("bootstrap prepare failed", zap.Error(err)) -} - -func (i *instrumentation) bootstrapStarted(shards int) { - i.logFields = []zapcore.Field{ - zap.Int("numShards", shards), - } - i.log.Info("bootstrap started", i.logFields...) -} - -func (i *instrumentation) bootstrapSucceeded() { - bootstrapDuration := i.nowFn().Sub(i.start) - i.bootstrapDuration.Record(bootstrapDuration) - i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) - i.log.Info("bootstrap succeeded, marking namespaces complete", i.logFields...) -} - -func (i *instrumentation) bootstrapFailed(err error) { - bootstrapDuration := i.nowFn().Sub(i.start) - i.bootstrapDuration.Record(bootstrapDuration) - i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) - i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) -} - -func (i *instrumentation) bootstrapNamespaceFailed(err error, namespaceID string) { - i.log.Info("bootstrap namespace error", append(i.logFields, []zapcore.Field{ - zap.String("namespace", namespaceID), - zap.Error(err), - }...)...) -} - -func (i *instrumentation) bootstrapNamespacesFailed(err error) { - duration := i.nowFn().Sub(i.startNamespaces) - i.bootstrapNamespacesDuration.Record(duration) - i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) - i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...) -} - -func (i *instrumentation) bootstrapNamespacesStarted() { - i.startNamespaces = i.nowFn() - i.log.Info("bootstrap namespaces start", i.logFields...) -} - -func (i *instrumentation) bootstrapNamespacesSucceeded() { - duration := i.nowFn().Sub(i.startNamespaces) - i.bootstrapNamespacesDuration.Record(duration) - i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) - i.log.Info("bootstrap namespaces success", i.logFields...) -} - -func (i *instrumentation) bootstrapCompletion() { - i.lastBootstrapCompletionTime = xtime.ToUnixNano(i.nowFn()) -} - -func (i *instrumentation) setIsBootstrapped(isBootstrapped bool) { - if isBootstrapped { - i.status.Update(1) - } else { - i.status.Update(0) - } -} - -func (i *instrumentation) setIsBootstrappedAndDurable(isBootstrappedAndDurable bool) { - if isBootstrappedAndDurable { - i.durableStatus.Update(1) - } else { - i.durableStatus.Update(0) - } -} - -func (i *instrumentation) missingNamespaceFromResult(err error) { - instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), func(l *zap.Logger) { - l.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) - }) -} - -func (i *instrumentation) bootstrapDataAccumulatorCloseFailed(err error) { - instrument.EmitAndLogInvariantViolation(i.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("could not close bootstrap data accumulator", - zap.Error(err)) - }) -} - -func newInstrumentation(opts Options) *instrumentation { - scope := opts.InstrumentOptions().MetricsScope() - return &instrumentation{ - opts: opts, - log: opts.InstrumentOptions().Logger(), - nowFn: opts.ClockOptions().NowFn(), - sleepFn: time.Sleep, - status: scope.Gauge("bootstrapped"), - bootstrapDuration: scope.Timer("bootstrap-duration"), - bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), - durableStatus: scope.Gauge("bootstrapped-durable"), - } +type bootstrapNamespace struct { + namespace databaseNamespace + shards []databaseShard } type bootstrapManager struct { sync.RWMutex - database database - mediator databaseMediator - bootstrapFn bootstrapFn - processProvider bootstrap.ProcessProvider - state BootstrapState - hasPending bool - instrumentation *instrumentation + database database + mediator databaseMediator + bootstrapFn bootstrapFn + processProvider bootstrap.ProcessProvider + state BootstrapState + hasPending bool + sleepFn sleepFn + nowFn clock.NowFn + lastBootstrapCompletionTime xtime.UnixNano + instrumentation *bootstrapInstrumentation } func newBootstrapManager( @@ -217,6 +97,8 @@ func newBootstrapManager( database: database, mediator: mediator, processProvider: opts.BootstrapProcessProvider(), + sleepFn: time.Sleep, + nowFn: opts.ClockOptions().NowFn(), instrumentation: newInstrumentation(opts), } m.bootstrapFn = m.bootstrap @@ -232,7 +114,7 @@ func (m *bootstrapManager) IsBootstrapped() bool { func (m *bootstrapManager) LastBootstrapCompletionTime() (xtime.UnixNano, bool) { m.RLock() - bsTime := m.instrumentation.lastBootstrapCompletionTime + bsTime := m.lastBootstrapCompletionTime m.RUnlock() return bsTime, bsTime > 0 } @@ -289,6 +171,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // failure we retry the bootstrap again. This is to avoid operators // needing to manually intervene for cases where failures are transient. m.instrumentation.bootstrapFnFailed(i + 1) + m.sleepFn(bootstrapRetryInterval) continue } @@ -305,7 +188,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // on its own course so that the load of ticking and flushing is more spread out // across the cluster. m.Lock() - m.instrumentation.bootstrapCompletion() + m.lastBootstrapCompletionTime = xtime.ToUnixNano(m.nowFn()) m.Unlock() return result, nil } @@ -315,11 +198,6 @@ func (m *bootstrapManager) Report() { m.instrumentation.setIsBootstrappedAndDurable(m.database.IsBootstrappedAndDurable()) } -type bootstrapNamespace struct { - namespace databaseNamespace - shards []databaseShard -} - func (m *bootstrapManager) bootstrap() error { ctx := context.NewContext() defer ctx.Close() @@ -343,12 +221,16 @@ func (m *bootstrapManager) bootstrap() error { // an error returned. for _, accumulator := range accmulators { if err := accumulator.Close(); err != nil { - m.instrumentation.bootstrapDataAccumulatorCloseFailed(err) + instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("could not close bootstrap data accumulator", + zap.Error(err)) + }) } } }() - m.instrumentation.bootstrapPreparing() + instrCtx := m.instrumentation.bootstrapPreparing() var ( bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces)) prepareWg sync.WaitGroup @@ -423,19 +305,19 @@ func (m *bootstrapManager) bootstrap() error { }) } - m.instrumentation.bootstrapStarted(len(uniqueShards)) + m.instrumentation.bootstrapStarted(instrCtx, len(uniqueShards)) // Run the bootstrap. - bootstrapResult, err := process.Run(ctx, m.instrumentation.start, targets) + bootstrapResult, err := process.Run(ctx, instrCtx.start, targets) if err != nil { - m.instrumentation.bootstrapFailed(err) + m.instrumentation.bootstrapFailed(instrCtx, err) return err } - m.instrumentation.bootstrapSucceeded() + m.instrumentation.bootstrapSucceeded(instrCtx) + + instrCtx = m.instrumentation.bootstrapNamespacesStarted(instrCtx) // Use a multi-error here because we want to at least bootstrap // as many of the namespaces as possible. - - m.instrumentation.bootstrapNamespacesStarted() multiErr := xerrors.NewMultiError() for _, namespace := range namespaces { id := namespace.ID() @@ -443,21 +325,25 @@ func (m *bootstrapManager) bootstrap() error { if !ok { err := fmt.Errorf("missing namespace from bootstrap result: %v", id.String()) - m.instrumentation.missingNamespaceFromResult(err) + instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(), + func(l *zap.Logger) { + l.Error("bootstrap failed", + append(instrCtx.logFields, zap.Error(err))...) + }) return err } if err := namespace.Bootstrap(ctx, result); err != nil { - m.instrumentation.bootstrapNamespaceFailed(err, id.String()) + m.instrumentation.bootstrapNamespaceFailed(instrCtx, err, id) multiErr = multiErr.Add(err) } } if err := multiErr.FinalError(); err != nil { - m.instrumentation.bootstrapNamespacesFailed(err) + m.instrumentation.bootstrapNamespacesFailed(instrCtx, err) return err } - m.instrumentation.bootstrapNamespacesSucceeded() + m.instrumentation.bootstrapNamespacesSucceeded(instrCtx) return nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go index fbca789c3f..8b9fbdc1fd 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go @@ -17,7 +17,7 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. - +// Package peers implements peers bootstrapping. package peers import ( diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index bef2d987ec..179ee4fdf7 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -28,8 +28,6 @@ import ( "time" "github.com/opentracing/opentracing-go" - opentracinglog "github.com/opentracing/opentracing-go/log" - "github.com/uber-go/tally" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -46,11 +44,9 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/topology" - "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/index/segment/fst" idxpersist "github.com/m3db/m3/src/m3ninx/persist" - "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" @@ -60,9 +56,9 @@ import ( ) type peersSource struct { - opts Options + opts Options newPersistManager func() (persist.Manager, error) - instrumentation *instrumentation + instrumentation *instrumentation } type persistenceFlush struct { @@ -72,189 +68,6 @@ type persistenceFlush struct { timeRange xtime.Range } -type instrumentation struct { - opts Options - log *zap.Logger - nowFn clock.NowFn - bootstrapDataDuration tally.Timer - bootstrapIndexDuration tally.Timer - bootstrapShardsDuration tally.Timer - persistedIndexBlocksOutOfRetention tally.Counter - start time.Time - startShards time.Time -} - -func newInstrumentation(opts Options) *instrumentation { - instrumentOptions := opts.ResultOptions().InstrumentOptions() - scope := instrumentOptions.MetricsScope().SubScope("peers-bootstrapper") - instrumentOptions = instrumentOptions.SetMetricsScope(scope) - return &instrumentation{ - opts: opts, - log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), - nowFn: opts.ResultOptions().ClockOptions().NowFn(), - bootstrapDataDuration: scope.Timer("peer-bootstrap-data-duration"), - bootstrapIndexDuration: scope.Timer("peer-bootstrap-index-duration"), - bootstrapShardsDuration: scope.Timer("peer-bootstrap-shards-duration"), - persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), - } -} - -func (i *instrumentation) bootstrapDataStarted(span opentracing.Span) { - i.start = i.nowFn() - i.log.Info("bootstrapping time series data start") - span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) -} - -func (i *instrumentation) bootstrapDataCompleted(span opentracing.Span) { - duration := i.nowFn().Sub(i.start) - i.bootstrapDataDuration.Record(duration) - i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) - span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) -} - -func (i *instrumentation) bootstrapIndexStarted(span opentracing.Span) { - i.start = i.nowFn() - i.log.Info("bootstrapping index metadata start") - span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) -} - -func (i *instrumentation) bootstrapIndexCompleted(span opentracing.Span) { - duration := i.nowFn().Sub(i.start) - i.bootstrapIndexDuration.Record(duration) - i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) - span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) -} - -func (i *instrumentation) bootstrapIndexSkipped(namespaceID ident.ID) { - i.log.Info("skipping bootstrap for namespace based on options", - zap.Stringer("namespace", namespaceID)) -} - -func (i *instrumentation) getDefaultAdminSessionFailed(err error) { - i.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) -} - -func (i *instrumentation) bootstrapShardsStarted(count int, concurrency int, shouldPersist bool) { - i.startShards = i.nowFn() - i.log.Info("peers bootstrapper bootstrapping shards for ranges", - zap.Int("shards", count), - zap.Int("concurrency", concurrency), - zap.Bool("shouldPersist", shouldPersist)) -} - -func (i *instrumentation) bootstrapShardsCompleted() { - duration := i.nowFn().Sub(i.startShards) - i.bootstrapShardsDuration.Record(duration) - i.log.Info("bootstrapping shards success", zap.Duration("took", duration)) -} - -func (i *instrumentation) persistenceFlushFailed(err error) { - i.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", - zap.Error(err)) -} - -func (i *instrumentation) seriesCheckoutFailed(err error) { - i.log.Error("could not checkout series", zap.Error(err)) -} - -func (i *instrumentation) seriesLoadFailed(err error) { - i.log.Error("could not load series block", zap.Error(err)) -} - -func (i *instrumentation) shardBootstrapped(shard uint32, numSeries int64, blockTime time.Time) { - i.log.Info("peer bootstrapped shard", - zap.Uint32("shard", shard), - zap.Int64("numSeries", numSeries), - zap.Time("blockStart", blockTime), - ) -} - -func (i *instrumentation) fetchBootstrapBlocksFailed(err error, shard uint32) { - i.log.Error("error fetching bootstrap blocks", - zap.Uint32("shard", shard), - zap.Error(err), - ) -} - -func (i *instrumentation) peersBootstrapperIndexForRanges(count int) { - i.log.Info("peers bootstrapper bootstrapping index for ranges", - zap.Int("shards", count), - ) -} - -func (i *instrumentation) processingReadersFailed(err error, start time.Time) { - i.log.Error("error processing readers", zap.Error(err), - zap.Time("timeRange.start", start)) -} - -func (i *instrumentation) buildingFileSetIndexSegmentStarted(fields []zapcore.Field) { - i.log.Debug("building file set index segment", fields...) -} - -func (i *instrumentation) outOfRetentionIndexSegmentSkipped(fields []zapcore.Field) { - i.log.Debug("skipping out of retention index segment", fields...) - i.persistedIndexBlocksOutOfRetention.Inc(1) -} - -func (i *instrumentation) buildingInMemoryIndexSegmentStarted(fields []zapcore.Field) { - i.log.Info("building in-memory index segment", fields...) -} - -func (i *instrumentation) errorsForRangeEncountered(summaryString string, errorsString []string) { - i.log.Info("encountered errors for range", - zap.String("requestedRanges", summaryString), - zap.Strings("timesWithErrors", errorsString)) -} - -func (i *instrumentation) noPeersAvailable(total int, shardIDUint uint32) { - i.log.Debug( - "0 available peers, unable to peer bootstrap", - zap.Int("total", total), zap.Uint32("shard", shardIDUint)) -} - -func (i *instrumentation) readConsistencyNotAchieved( - bootstrapConsistencyLevel topology.ReadConsistencyLevel, - majorityReplicas int, - total int, - available int, -) { - i.log.Debug( - "read consistency not achieved, unable to peer bootstrap", - zap.Any("level", bootstrapConsistencyLevel), - zap.Int("replicas", majorityReplicas), - zap.Int("total", total), - zap.Int("available", available)) -} - -func (i *instrumentation) peersBootstrapperSourceReadStarted(ctx context.Context) ( - context.Context, opentracing.Span, bool) { - return ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) -} - -func (i *instrumentation) persistFsIndexBootstrapFailed(err error, - iopts instrument.Options, - id ident.ID, - ranges result.ShardTimeRanges) { - instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { - l.Error("persist fs index bootstrap failed", - zap.Stringer("namespace", id), - zap.Stringer("requestedRanges", ranges), - zap.Error(err)) - }) -} - -func (i *instrumentation) buildFsIndexBootstrapFailed(err error, - iopts instrument.Options, - id ident.ID, - ranges result.ShardTimeRanges) { - instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { - l.Error("build fs index bootstrap failed", - zap.Stringer("namespace", id), - zap.Stringer("requestedRanges", ranges), - zap.Error(err)) - }) -} - func newPeersSource(opts Options) (bootstrap.Source, error) { if err := opts.Validate(); err != nil { return nil, err @@ -303,7 +116,7 @@ func (s *peersSource) Read( namespaces bootstrap.Namespaces, cache bootstrap.Cache, ) (bootstrap.NamespaceResults, error) { - _, span, _ := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) + span := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) defer span.Finish() timeRangesEmpty := true @@ -329,7 +142,7 @@ func (s *peersSource) Read( // NB(r): Perform all data bootstrapping first then index bootstrapping // to more clearly delineate which process is slower than the other. - s.instrumentation.bootstrapDataStarted(span) + instrCtx := s.instrumentation.bootstrapDataStarted(span) for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -347,12 +160,12 @@ func (s *peersSource) Read( DataResult: r, }) } - s.instrumentation.bootstrapDataCompleted(span) + s.instrumentation.bootstrapDataCompleted(instrCtx, span) // NB(bodu): We need to evict the info file cache before reading index data since we've // maybe fetched blocks from peers so the cached info file state is now stale. cache.Evict() - s.instrumentation.bootstrapIndexStarted(span) + instrCtx = s.instrumentation.bootstrapIndexStarted(span) for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -382,7 +195,7 @@ func (s *peersSource) Read( results.Results.Set(md.ID(), result) } - s.instrumentation.bootstrapIndexCompleted(span) + s.instrumentation.bootstrapIndexCompleted(instrCtx, span) return results, nil } @@ -437,7 +250,7 @@ func (s *peersSource) readData( concurrency = s.opts.ShardPersistenceConcurrency() } - s.instrumentation.bootstrapShardsStarted(count, concurrency, shouldPersist) + instrCtx := s.instrumentation.bootstrapShardsStarted(count, concurrency, shouldPersist) if shouldPersist { // Spin up persist workers. for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ { @@ -481,7 +294,7 @@ func (s *peersSource) readData( } } - s.instrumentation.bootstrapShardsCompleted() + s.instrumentation.bootstrapShardsCompleted(instrCtx) return result, nil } @@ -648,7 +461,7 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( } shardBlockSeriesCounter := map[xtime.UnixNano]int64{} - for _, entry := range shardResult.AllSeries().Iter() { // nolint + for _, entry := range shardResult.AllSeries().Iter() { // nolint series := entry.Value() for blockStart := range series.Blocks.AllBlocks() { shardBlockSeriesCounter[blockStart]++ @@ -1111,7 +924,12 @@ func (s *peersSource) processReaders( s.instrumentation.outOfRetentionIndexSegmentSkipped(buildIndexLogFields) return remainingRanges, timesWithErrors } else if err != nil { - s.instrumentation.persistFsIndexBootstrapFailed(err, iopts, ns.ID(), requestedRanges) + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("persist fs index bootstrap failed", + zap.Stringer("namespace", ns.ID()), + zap.Stringer("requestedRanges", requestedRanges), + zap.Error(err)) + }) } } else { s.instrumentation.buildingInMemoryIndexSegmentStarted(buildIndexLogFields) @@ -1126,7 +944,12 @@ func (s *peersSource) processReaders( blockEnd, ) if err != nil { - s.instrumentation.buildFsIndexBootstrapFailed(err, iopts, ns.ID(), requestedRanges) + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("build fs index bootstrap failed", + zap.Stringer("namespace", ns.ID()), + zap.Stringer("requestedRanges", requestedRanges), + zap.Error(err)) + }) } } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go new file mode 100644 index 0000000000..49ed93acc4 --- /dev/null +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go @@ -0,0 +1,208 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package peers + +import ( + "time" + + "github.com/opentracing/opentracing-go" + opentracinglog "github.com/opentracing/opentracing-go/log" + "github.com/uber-go/tally" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/m3db/m3/src/dbnode/topology" + "github.com/m3db/m3/src/dbnode/tracepoint" + "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" +) + +type instrumentationContext struct { + start time.Time +} + +type instrumentation struct { + opts Options + log *zap.Logger + nowFn clock.NowFn + bootstrapDataDuration tally.Timer + bootstrapIndexDuration tally.Timer + bootstrapShardsDuration tally.Timer + persistedIndexBlocksOutOfRetention tally.Counter +} + +func newInstrumentation(opts Options) *instrumentation { + var ( + scope = opts.ResultOptions().InstrumentOptions(). + MetricsScope().SubScope("peers-bootstrapper") + instrumentOptions = opts.ResultOptions().InstrumentOptions().SetMetricsScope(scope) + ) + + return &instrumentation{ + opts: opts, + log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), + nowFn: opts.ResultOptions().ClockOptions().NowFn(), + bootstrapDataDuration: scope.Timer("data-duration"), + bootstrapIndexDuration: scope.Timer("index-duration"), + bootstrapShardsDuration: scope.Timer("shards-duration"), + persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), + } +} + +func (i *instrumentation) bootstrapDataStarted(span opentracing.Span) *instrumentationContext { + i.log.Info("bootstrapping time series data start") + span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) + return &instrumentationContext{ + start: i.nowFn(), + } +} + +func (i *instrumentation) bootstrapDataCompleted(ctx *instrumentationContext, span opentracing.Span) { + duration := i.nowFn().Sub(ctx.start) + i.bootstrapDataDuration.Record(duration) + i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) + span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) +} + +func (i *instrumentation) bootstrapIndexStarted(span opentracing.Span) *instrumentationContext { + i.log.Info("bootstrapping index metadata start") + span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) + return &instrumentationContext{start: i.nowFn()} +} + +func (i *instrumentation) bootstrapIndexCompleted(ctx *instrumentationContext, span opentracing.Span) { + duration := i.nowFn().Sub(ctx.start) + i.bootstrapIndexDuration.Record(duration) + i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) + span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) +} + +func (i *instrumentation) bootstrapIndexSkipped(namespaceID ident.ID) { + i.log.Info("skipping bootstrap for namespace based on options", + zap.Stringer("namespace", namespaceID)) +} + +func (i *instrumentation) getDefaultAdminSessionFailed(err error) { + i.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) +} + +func (i *instrumentation) bootstrapShardsStarted( + count int, + concurrency int, + shouldPersist bool, +) *instrumentationContext { + i.log.Info("peers bootstrapper bootstrapping shards for ranges", + zap.Int("shards", count), + zap.Int("concurrency", concurrency), + zap.Bool("shouldPersist", shouldPersist)) + return &instrumentationContext{start: i.nowFn()} +} + +func (i *instrumentation) bootstrapShardsCompleted(ctx *instrumentationContext) { + duration := i.nowFn().Sub(ctx.start) + i.bootstrapShardsDuration.Record(duration) + i.log.Info("bootstrapping shards success", zap.Duration("took", duration)) +} + +func (i *instrumentation) persistenceFlushFailed(err error) { + i.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", + zap.Error(err)) +} + +func (i *instrumentation) seriesCheckoutFailed(err error) { + i.log.Error("could not checkout series", zap.Error(err)) +} + +func (i *instrumentation) seriesLoadFailed(err error) { + i.log.Error("could not load series block", zap.Error(err)) +} + +func (i *instrumentation) shardBootstrapped(shard uint32, numSeries int64, blockTime time.Time) { + i.log.Info("peer bootstrapped shard", + zap.Uint32("shard", shard), + zap.Int64("numSeries", numSeries), + zap.Time("blockStart", blockTime), + ) +} + +func (i *instrumentation) fetchBootstrapBlocksFailed(err error, shard uint32) { + i.log.Error("error fetching bootstrap blocks", + zap.Uint32("shard", shard), + zap.Error(err), + ) +} + +func (i *instrumentation) peersBootstrapperIndexForRanges(count int) { + i.log.Info("peers bootstrapper bootstrapping index for ranges", + zap.Int("shards", count), + ) +} + +func (i *instrumentation) processingReadersFailed(err error, start time.Time) { + i.log.Error("error processing readers", zap.Error(err), + zap.Time("timeRange.start", start)) +} + +func (i *instrumentation) buildingFileSetIndexSegmentStarted(fields []zapcore.Field) { + i.log.Debug("building file set index segment", fields...) +} + +func (i *instrumentation) outOfRetentionIndexSegmentSkipped(fields []zapcore.Field) { + i.log.Debug("skipping out of retention index segment", fields...) + i.persistedIndexBlocksOutOfRetention.Inc(1) +} + +func (i *instrumentation) buildingInMemoryIndexSegmentStarted(fields []zapcore.Field) { + i.log.Info("building in-memory index segment", fields...) +} + +func (i *instrumentation) errorsForRangeEncountered(summaryString string, errorsString []string) { + i.log.Info("encountered errors for range", + zap.String("requestedRanges", summaryString), + zap.Strings("timesWithErrors", errorsString)) +} + +func (i *instrumentation) noPeersAvailable(total int, shardIDUint uint32) { + i.log.Debug("0 available peers, unable to peer bootstrap", + zap.Int("total", total), + zap.Uint32("shard", shardIDUint)) +} + +func (i *instrumentation) readConsistencyNotAchieved( + bootstrapConsistencyLevel topology.ReadConsistencyLevel, + majorityReplicas int, + total int, + available int, +) { + i.log.Debug("read consistency not achieved, unable to peer bootstrap", + zap.Any("level", bootstrapConsistencyLevel), + zap.Int("replicas", majorityReplicas), + zap.Int("total", total), + zap.Int("available", available)) +} + +func (i *instrumentation) peersBootstrapperSourceReadStarted( + ctx context.Context, +) opentracing.Span { + _, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) + return span +} diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go new file mode 100644 index 0000000000..fe2ce7bbc6 --- /dev/null +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -0,0 +1,145 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package storage + +import ( + "time" + + "github.com/uber-go/tally" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/m3db/m3/src/x/clock" + "github.com/m3db/m3/src/x/ident" +) + +type instrumentationContext struct { + start time.Time + logFields []zapcore.Field +} + +type bootstrapInstrumentation struct { + opts Options + log *zap.Logger + nowFn clock.NowFn + status tally.Gauge + bootstrapDuration tally.Timer + bootstrapNamespacesDuration tally.Timer + durableStatus tally.Gauge +} + +func newInstrumentation(opts Options) *bootstrapInstrumentation { + scope := opts.InstrumentOptions().MetricsScope() + return &bootstrapInstrumentation{ + opts: opts, + log: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), + status: scope.Gauge("bootstrapped"), + bootstrapDuration: scope.Timer("bootstrap-duration"), + bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), + durableStatus: scope.Gauge("bootstrapped-durable"), + } +} + +func (i *bootstrapInstrumentation) bootstrapFnFailed(retry int) { + i.log.Warn("retrying bootstrap after backoff", + zap.Duration("backoff", bootstrapRetryInterval), + zap.Int("numRetries", retry+1)) +} + +func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext { + i.log.Info("bootstrap prepare") + return &instrumentationContext{ + start: i.nowFn(), + } +} + +func (i *bootstrapInstrumentation) bootstrapPrepareFailed(err error) { + i.log.Error("bootstrap prepare failed", zap.Error(err)) +} + +func (i *bootstrapInstrumentation) bootstrapStarted(ctx *instrumentationContext, shards int) { + ctx.logFields = append(ctx.logFields, zap.Int("numShards", shards)) + i.log.Info("bootstrap started", ctx.logFields...) +} + +func (i *bootstrapInstrumentation) bootstrapSucceeded(ctx *instrumentationContext) { + bootstrapDuration := i.nowFn().Sub(ctx.start) + i.bootstrapDuration.Record(bootstrapDuration) + ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Info("bootstrap succeeded, marking namespaces complete", ctx.logFields...) +} + +func (i *bootstrapInstrumentation) bootstrapFailed(ctx *instrumentationContext, err error) { + bootstrapDuration := i.nowFn().Sub(ctx.start) + i.bootstrapDuration.Record(bootstrapDuration) + ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Error("bootstrap failed", append(ctx.logFields, zap.Error(err))...) +} + +func (i *bootstrapInstrumentation) bootstrapNamespaceFailed( + ctx *instrumentationContext, + err error, + namespaceID ident.ID, +) { + i.log.Info("bootstrap namespace error", append(ctx.logFields, []zapcore.Field{ + zap.String("namespace", namespaceID.String()), + zap.Error(err), + }...)...) +} + +func (i *bootstrapInstrumentation) bootstrapNamespacesFailed(ctx *instrumentationContext, err error) { + duration := i.nowFn().Sub(ctx.start) + i.bootstrapNamespacesDuration.Record(duration) + ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces failed", append(ctx.logFields, zap.Error(err))...) +} + +func (i *bootstrapInstrumentation) bootstrapNamespacesStarted(ctx *instrumentationContext) *instrumentationContext { + i.log.Info("bootstrap namespaces start", ctx.logFields...) + return &instrumentationContext{ + start: i.nowFn(), + logFields: ctx.logFields, + } +} + +func (i *bootstrapInstrumentation) bootstrapNamespacesSucceeded(ctx *instrumentationContext) { + duration := i.nowFn().Sub(ctx.start) + i.bootstrapNamespacesDuration.Record(duration) + ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces success", ctx.logFields...) +} + +func (i *bootstrapInstrumentation) setIsBootstrapped(isBootstrapped bool) { + var status float64 = 0 + if isBootstrapped { + status = 1 + } + i.status.Update(status) +} + +func (i *bootstrapInstrumentation) setIsBootstrappedAndDurable(isBootstrappedAndDurable bool) { + var status float64 = 0 + if isBootstrappedAndDurable { + status = 1 + } + i.durableStatus.Update(status) +} diff --git a/src/dbnode/storage/bootstrap_test.go b/src/dbnode/storage/bootstrap_test.go index 1faebed605..95337e996f 100644 --- a/src/dbnode/storage/bootstrap_test.go +++ b/src/dbnode/storage/bootstrap_test.go @@ -63,7 +63,7 @@ func TestDatabaseBootstrapWithBootstrapError(t *testing.T) { bsm := newBootstrapManager(db, m, opts).(*bootstrapManager) // Don't sleep. - bsm.instrumentation.sleepFn = func(time.Duration) {} + bsm.sleepFn = func(time.Duration) {} gomock.InOrder( ns.EXPECT().PrepareBootstrap(gomock.Any()).Return([]databaseShard{}, nil), From cb237a35e9ac97fafc512c660fe9cf4a63e1fec9 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Thu, 7 Jan 2021 17:07:27 +0200 Subject: [PATCH 06/10] added new gauge - bootstrap-retries --- .../bootstrapper/peers/source_instrumentation.go | 4 +--- src/dbnode/storage/bootstrap_instrumentation.go | 9 +++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go index 49ed93acc4..be783cb175 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go @@ -71,9 +71,7 @@ func newInstrumentation(opts Options) *instrumentation { func (i *instrumentation) bootstrapDataStarted(span opentracing.Span) *instrumentationContext { i.log.Info("bootstrapping time series data start") span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) - return &instrumentationContext{ - start: i.nowFn(), - } + return &instrumentationContext{start: i.nowFn()} } func (i *instrumentation) bootstrapDataCompleted(ctx *instrumentationContext, span opentracing.Span) { diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go index fe2ce7bbc6..c287da3ce8 100644 --- a/src/dbnode/storage/bootstrap_instrumentation.go +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -44,6 +44,7 @@ type bootstrapInstrumentation struct { bootstrapDuration tally.Timer bootstrapNamespacesDuration tally.Timer durableStatus tally.Gauge + numRetries tally.Gauge } func newInstrumentation(opts Options) *bootstrapInstrumentation { @@ -56,20 +57,20 @@ func newInstrumentation(opts Options) *bootstrapInstrumentation { bootstrapDuration: scope.Timer("bootstrap-duration"), bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), durableStatus: scope.Gauge("bootstrapped-durable"), + numRetries: scope.Gauge("bootstrap-retries"), } } func (i *bootstrapInstrumentation) bootstrapFnFailed(retry int) { + i.numRetries.Update(float64(retry)) i.log.Warn("retrying bootstrap after backoff", zap.Duration("backoff", bootstrapRetryInterval), - zap.Int("numRetries", retry+1)) + zap.Int("numRetries", retry)) } func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext { i.log.Info("bootstrap prepare") - return &instrumentationContext{ - start: i.nowFn(), - } + return &instrumentationContext{start: i.nowFn()} } func (i *bootstrapInstrumentation) bootstrapPrepareFailed(err error) { From 078570dd5370776f3c5209cd78cef352aba2d4fe Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 11 Jan 2021 12:42:21 +0200 Subject: [PATCH 07/10] moved some instrumentation methods to instrumentationContexts for better encapsulation and easier usage. --- src/dbnode/storage/bootstrap.go | 33 ++-- .../bootstrap/bootstrapper/peers/peers.go | 1 + .../bootstrap/bootstrapper/peers/source.go | 18 +-- .../peers/source_instrumentation.go | 125 +++++++++------ .../storage/bootstrap_instrumentation.go | 145 ++++++++++-------- 5 files changed, 175 insertions(+), 147 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 4c9dc1c716..92ed6f1afb 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -32,8 +32,6 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" - - "go.uber.org/zap" ) var ( @@ -214,6 +212,8 @@ func (m *bootstrapManager) bootstrap() error { return err } + instrCtx := m.instrumentation.bootstrapPreparing() + accmulators := make([]bootstrap.NamespaceDataAccumulator, 0, len(namespaces)) defer func() { // Close all accumulators at bootstrap completion, only error @@ -221,16 +221,12 @@ func (m *bootstrapManager) bootstrap() error { // an error returned. for _, accumulator := range accmulators { if err := accumulator.Close(); err != nil { - instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("could not close bootstrap data accumulator", - zap.Error(err)) - }) + instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions, + instrCtx.logFn(err, "could not close bootstrap data accumulator")) } } }() - instrCtx := m.instrumentation.bootstrapPreparing() var ( bootstrapNamespaces = make([]bootstrapNamespace, len(namespaces)) prepareWg sync.WaitGroup @@ -305,17 +301,17 @@ func (m *bootstrapManager) bootstrap() error { }) } - m.instrumentation.bootstrapStarted(instrCtx, len(uniqueShards)) + instrCtx.bootstrapStarted(len(uniqueShards)) // Run the bootstrap. bootstrapResult, err := process.Run(ctx, instrCtx.start, targets) if err != nil { - m.instrumentation.bootstrapFailed(instrCtx, err) + instrCtx.bootstrapFailed(err) return err } - m.instrumentation.bootstrapSucceeded(instrCtx) + instrCtx.bootstrapSucceeded() - instrCtx = m.instrumentation.bootstrapNamespacesStarted(instrCtx) + instrCtx.bootstrapNamespacesStarted() // Use a multi-error here because we want to at least bootstrap // as many of the namespaces as possible. multiErr := xerrors.NewMultiError() @@ -325,25 +321,22 @@ func (m *bootstrapManager) bootstrap() error { if !ok { err := fmt.Errorf("missing namespace from bootstrap result: %v", id.String()) - instrument.EmitAndLogInvariantViolation(m.instrumentation.opts.InstrumentOptions(), - func(l *zap.Logger) { - l.Error("bootstrap failed", - append(instrCtx.logFields, zap.Error(err))...) - }) + instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions, + instrCtx.logFn(err, "bootstrap failed")) return err } if err := namespace.Bootstrap(ctx, result); err != nil { - m.instrumentation.bootstrapNamespaceFailed(instrCtx, err, id) + instrCtx.bootstrapNamespaceFailed(err, id) multiErr = multiErr.Add(err) } } if err := multiErr.FinalError(); err != nil { - m.instrumentation.bootstrapNamespacesFailed(instrCtx, err) + instrCtx.bootstrapNamespacesFailed(err) return err } - m.instrumentation.bootstrapNamespacesSucceeded(instrCtx) + instrCtx.bootstrapNamespacesSucceeded() return nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go index 8b9fbdc1fd..cb536f5edc 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/peers.go @@ -17,6 +17,7 @@ // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. + // Package peers implements peers bootstrapping. package peers diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index 179ee4fdf7..b20b3c7ee1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -116,8 +116,8 @@ func (s *peersSource) Read( namespaces bootstrap.Namespaces, cache bootstrap.Cache, ) (bootstrap.NamespaceResults, error) { - span := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) - defer span.Finish() + instrCtx := s.instrumentation.peersBootstrapperSourceReadStarted(ctx) + defer instrCtx.finish() timeRangesEmpty := true for _, elem := range namespaces.Namespaces.Iter() { @@ -142,7 +142,7 @@ func (s *peersSource) Read( // NB(r): Perform all data bootstrapping first then index bootstrapping // to more clearly delineate which process is slower than the other. - instrCtx := s.instrumentation.bootstrapDataStarted(span) + instrCtx.bootstrapDataStarted() for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata @@ -160,23 +160,23 @@ func (s *peersSource) Read( DataResult: r, }) } - s.instrumentation.bootstrapDataCompleted(instrCtx, span) + instrCtx.bootstrapDataCompleted() // NB(bodu): We need to evict the info file cache before reading index data since we've // maybe fetched blocks from peers so the cached info file state is now stale. cache.Evict() - instrCtx = s.instrumentation.bootstrapIndexStarted(span) + instrCtx.bootstrapIndexStarted() for _, elem := range namespaces.Namespaces.Iter() { namespace := elem.Value() md := namespace.Metadata if !md.Options().IndexOptions().Enabled() { - s.instrumentation.bootstrapIndexSkipped(md.ID()) + instrCtx.bootstrapIndexSkipped(md.ID()) continue } r, err := s.readIndex(md, namespace.IndexRunOptions.ShardTimeRanges, - span, + instrCtx.span, cache, namespace.IndexRunOptions.RunOptions, ) @@ -195,7 +195,7 @@ func (s *peersSource) Read( results.Results.Set(md.ID(), result) } - s.instrumentation.bootstrapIndexCompleted(instrCtx, span) + instrCtx.bootstrapIndexCompleted() return results, nil } @@ -294,7 +294,7 @@ func (s *peersSource) readData( } } - s.instrumentation.bootstrapShardsCompleted(instrCtx) + instrCtx.bootstrapShardsCompleted() return result, nil } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go index be783cb175..d2e29c35f2 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go @@ -37,16 +37,67 @@ import ( ) type instrumentationContext struct { - start time.Time + nowFn clock.NowFn + log *zap.Logger + start time.Time + span opentracing.Span + bootstrapDataDuration tally.Timer + bootstrapIndexDuration tally.Timer +} + +func (i *instrumentationContext) finish() { + i.span.Finish() +} + +func (i *instrumentationContext) bootstrapDataStarted() { + i.log.Info("bootstrapping time series data start") + i.span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) + i.start = i.nowFn() +} + +func (i *instrumentationContext) bootstrapDataCompleted() { + duration := i.nowFn().Sub(i.start) + i.bootstrapDataDuration.Record(duration) + i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) + i.span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) +} + +func (i *instrumentationContext) bootstrapIndexStarted() { + i.log.Info("bootstrapping index metadata start") + i.span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) + i.start = i.nowFn() +} + +func (i *instrumentationContext) bootstrapIndexSkipped(namespaceID ident.ID) { + i.log.Info("skipping bootstrap for namespace based on options", + zap.Stringer("namespace", namespaceID)) +} + +func (i *instrumentationContext) bootstrapIndexCompleted() { + duration := i.nowFn().Sub(i.start) + i.bootstrapIndexDuration.Record(duration) + i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) + i.span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) +} + +type instrumentationReadShardsContext struct { + nowFn clock.NowFn + log *zap.Logger + start time.Time + bootstrapShardsDuration tally.Timer +} + +func (i *instrumentationReadShardsContext) bootstrapShardsCompleted() { + duration := i.nowFn().Sub(i.start) + i.bootstrapShardsDuration.Record(duration) + i.log.Info("bootstrapping shards success", zap.Duration("took", duration)) } type instrumentation struct { opts Options + scope tally.Scope log *zap.Logger nowFn clock.NowFn - bootstrapDataDuration tally.Timer - bootstrapIndexDuration tally.Timer - bootstrapShardsDuration tally.Timer persistedIndexBlocksOutOfRetention tally.Counter } @@ -59,44 +110,24 @@ func newInstrumentation(opts Options) *instrumentation { return &instrumentation{ opts: opts, + scope: scope, log: instrumentOptions.Logger().With(zap.String("bootstrapper", "peers")), nowFn: opts.ResultOptions().ClockOptions().NowFn(), - bootstrapDataDuration: scope.Timer("data-duration"), - bootstrapIndexDuration: scope.Timer("index-duration"), - bootstrapShardsDuration: scope.Timer("shards-duration"), persistedIndexBlocksOutOfRetention: scope.Counter("persist-index-blocks-out-of-retention"), } } -func (i *instrumentation) bootstrapDataStarted(span opentracing.Span) *instrumentationContext { - i.log.Info("bootstrapping time series data start") - span.LogFields(opentracinglog.String("event", "bootstrap_data_start")) - return &instrumentationContext{start: i.nowFn()} -} - -func (i *instrumentation) bootstrapDataCompleted(ctx *instrumentationContext, span opentracing.Span) { - duration := i.nowFn().Sub(ctx.start) - i.bootstrapDataDuration.Record(duration) - i.log.Info("bootstrapping time series data success", zap.Duration("took", duration)) - span.LogFields(opentracinglog.String("event", "bootstrap_data_done")) -} - -func (i *instrumentation) bootstrapIndexStarted(span opentracing.Span) *instrumentationContext { - i.log.Info("bootstrapping index metadata start") - span.LogFields(opentracinglog.String("event", "bootstrap_index_start")) - return &instrumentationContext{start: i.nowFn()} -} - -func (i *instrumentation) bootstrapIndexCompleted(ctx *instrumentationContext, span opentracing.Span) { - duration := i.nowFn().Sub(ctx.start) - i.bootstrapIndexDuration.Record(duration) - i.log.Info("bootstrapping index metadata success", zap.Duration("took", duration)) - span.LogFields(opentracinglog.String("event", "bootstrap_index_done")) -} - -func (i *instrumentation) bootstrapIndexSkipped(namespaceID ident.ID) { - i.log.Info("skipping bootstrap for namespace based on options", - zap.Stringer("namespace", namespaceID)) +func (i *instrumentation) peersBootstrapperSourceReadStarted( + ctx context.Context, +) *instrumentationContext { + _, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) + return &instrumentationContext{ + nowFn: i.nowFn, + log: i.log, + span: span, + bootstrapDataDuration: i.scope.Timer("data-duration"), + bootstrapIndexDuration: i.scope.Timer("index-duration"), + } } func (i *instrumentation) getDefaultAdminSessionFailed(err error) { @@ -107,18 +138,17 @@ func (i *instrumentation) bootstrapShardsStarted( count int, concurrency int, shouldPersist bool, -) *instrumentationContext { +) *instrumentationReadShardsContext { i.log.Info("peers bootstrapper bootstrapping shards for ranges", zap.Int("shards", count), zap.Int("concurrency", concurrency), zap.Bool("shouldPersist", shouldPersist)) - return &instrumentationContext{start: i.nowFn()} -} - -func (i *instrumentation) bootstrapShardsCompleted(ctx *instrumentationContext) { - duration := i.nowFn().Sub(ctx.start) - i.bootstrapShardsDuration.Record(duration) - i.log.Info("bootstrapping shards success", zap.Duration("took", duration)) + return &instrumentationReadShardsContext{ + nowFn: i.nowFn, + log: i.log, + start: i.nowFn(), + bootstrapShardsDuration: i.scope.Timer("shards-duration"), + } } func (i *instrumentation) persistenceFlushFailed(err error) { @@ -197,10 +227,3 @@ func (i *instrumentation) readConsistencyNotAchieved( zap.Int("total", total), zap.Int("available", available)) } - -func (i *instrumentation) peersBootstrapperSourceReadStarted( - ctx context.Context, -) opentracing.Span { - _, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) - return span -} diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go index c287da3ce8..a1821bfe4b 100644 --- a/src/dbnode/storage/bootstrap_instrumentation.go +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -29,104 +29,115 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/instrument" ) type instrumentationContext struct { - start time.Time - logFields []zapcore.Field -} - -type bootstrapInstrumentation struct { - opts Options + start time.Time log *zap.Logger - nowFn clock.NowFn - status tally.Gauge + logFields []zapcore.Field bootstrapDuration tally.Timer bootstrapNamespacesDuration tally.Timer - durableStatus tally.Gauge - numRetries tally.Gauge -} - -func newInstrumentation(opts Options) *bootstrapInstrumentation { - scope := opts.InstrumentOptions().MetricsScope() - return &bootstrapInstrumentation{ - opts: opts, - log: opts.InstrumentOptions().Logger(), - nowFn: opts.ClockOptions().NowFn(), - status: scope.Gauge("bootstrapped"), - bootstrapDuration: scope.Timer("bootstrap-duration"), - bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), - durableStatus: scope.Gauge("bootstrapped-durable"), - numRetries: scope.Gauge("bootstrap-retries"), - } -} - -func (i *bootstrapInstrumentation) bootstrapFnFailed(retry int) { - i.numRetries.Update(float64(retry)) - i.log.Warn("retrying bootstrap after backoff", - zap.Duration("backoff", bootstrapRetryInterval), - zap.Int("numRetries", retry)) + nowFn clock.NowFn + instrumentOptions instrument.Options } -func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext { - i.log.Info("bootstrap prepare") - return &instrumentationContext{start: i.nowFn()} +func (i *instrumentationContext) bootstrapStarted(shards int) { + i.logFields = append(i.logFields, zap.Int("numShards", shards)) + i.log.Info("bootstrap started", i.logFields...) } -func (i *bootstrapInstrumentation) bootstrapPrepareFailed(err error) { - i.log.Error("bootstrap prepare failed", zap.Error(err)) +func (i *instrumentationContext) bootstrapSucceeded() { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Info("bootstrap succeeded, marking namespaces complete", i.logFields...) } -func (i *bootstrapInstrumentation) bootstrapStarted(ctx *instrumentationContext, shards int) { - ctx.logFields = append(ctx.logFields, zap.Int("numShards", shards)) - i.log.Info("bootstrap started", ctx.logFields...) +func (i *instrumentationContext) bootstrapFailed(err error) { + bootstrapDuration := i.nowFn().Sub(i.start) + i.bootstrapDuration.Record(bootstrapDuration) + i.logFields = append(i.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) + i.log.Error("bootstrap failed", append(i.logFields, zap.Error(err))...) } -func (i *bootstrapInstrumentation) bootstrapSucceeded(ctx *instrumentationContext) { - bootstrapDuration := i.nowFn().Sub(ctx.start) - i.bootstrapDuration.Record(bootstrapDuration) - ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) - i.log.Info("bootstrap succeeded, marking namespaces complete", ctx.logFields...) +func (i *instrumentationContext) bootstrapNamespacesStarted() { + i.start = i.nowFn() + i.log.Info("bootstrap namespaces start", i.logFields...) } -func (i *bootstrapInstrumentation) bootstrapFailed(ctx *instrumentationContext, err error) { - bootstrapDuration := i.nowFn().Sub(ctx.start) - i.bootstrapDuration.Record(bootstrapDuration) - ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapDuration", bootstrapDuration)) - i.log.Error("bootstrap failed", append(ctx.logFields, zap.Error(err))...) +func (i *instrumentationContext) bootstrapNamespacesSucceeded() { + duration := i.nowFn().Sub(i.start) + i.bootstrapNamespacesDuration.Record(duration) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces success", i.logFields...) } -func (i *bootstrapInstrumentation) bootstrapNamespaceFailed( - ctx *instrumentationContext, +func (i *instrumentationContext) bootstrapNamespaceFailed( err error, namespaceID ident.ID, ) { - i.log.Info("bootstrap namespace error", append(ctx.logFields, []zapcore.Field{ + i.log.Info("bootstrap namespace error", append(i.logFields, zap.String("namespace", namespaceID.String()), - zap.Error(err), - }...)...) + zap.Error(err))...) } -func (i *bootstrapInstrumentation) bootstrapNamespacesFailed(ctx *instrumentationContext, err error) { - duration := i.nowFn().Sub(ctx.start) +func (i *instrumentationContext) bootstrapNamespacesFailed(err error) { + duration := i.nowFn().Sub(i.start) i.bootstrapNamespacesDuration.Record(duration) - ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) - i.log.Info("bootstrap namespaces failed", append(ctx.logFields, zap.Error(err))...) + i.logFields = append(i.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) + i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...) +} + +func (i *instrumentationContext) logFn(err error, msg string) func(l *zap.Logger) { + return func(l *zap.Logger) { + l.Error(msg, append(i.logFields, zap.Error(err))...) + } +} + +type bootstrapInstrumentation struct { + opts Options + scope tally.Scope + log *zap.Logger + nowFn clock.NowFn + status tally.Gauge + durableStatus tally.Gauge + numRetries tally.Counter +} + +func newInstrumentation(opts Options) *bootstrapInstrumentation { + scope := opts.InstrumentOptions().MetricsScope() + return &bootstrapInstrumentation{ + opts: opts, + scope: scope, + log: opts.InstrumentOptions().Logger(), + status: scope.Gauge("bootstrapped"), + durableStatus: scope.Gauge("bootstrapped-durable"), + numRetries: scope.Counter("bootstrap-retries"), + } } -func (i *bootstrapInstrumentation) bootstrapNamespacesStarted(ctx *instrumentationContext) *instrumentationContext { - i.log.Info("bootstrap namespaces start", ctx.logFields...) +func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext { + i.log.Info("bootstrap prepare") return &instrumentationContext{ - start: i.nowFn(), - logFields: ctx.logFields, + start: i.nowFn(), + log: i.log, + nowFn: i.nowFn, + bootstrapDuration: i.scope.Timer("bootstrap-duration"), + bootstrapNamespacesDuration: i.scope.Timer("bootstrap-namespaces-duration"), + instrumentOptions: i.opts.InstrumentOptions(), } } -func (i *bootstrapInstrumentation) bootstrapNamespacesSucceeded(ctx *instrumentationContext) { - duration := i.nowFn().Sub(ctx.start) - i.bootstrapNamespacesDuration.Record(duration) - ctx.logFields = append(ctx.logFields, zap.Duration("bootstrapNamespacesDuration", duration)) - i.log.Info("bootstrap namespaces success", ctx.logFields...) +func (i *bootstrapInstrumentation) bootstrapFnFailed(retry int) { + i.numRetries.Inc(1) + i.log.Warn("retrying bootstrap after backoff", + zap.Duration("backoff", bootstrapRetryInterval), + zap.Int("numRetries", retry)) +} + +func (i *bootstrapInstrumentation) bootstrapPrepareFailed(err error) { + i.log.Error("bootstrap prepare failed", zap.Error(err)) } func (i *bootstrapInstrumentation) setIsBootstrapped(isBootstrapped bool) { From c784876fbf5c3bdde1c914bcc15e5b81b6bc2ea0 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 11 Jan 2021 13:44:50 +0200 Subject: [PATCH 08/10] forgot to set nowFn --- src/dbnode/storage/bootstrap_instrumentation.go | 1 + 1 file changed, 1 insertion(+) diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go index a1821bfe4b..d004712de3 100644 --- a/src/dbnode/storage/bootstrap_instrumentation.go +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -111,6 +111,7 @@ func newInstrumentation(opts Options) *bootstrapInstrumentation { opts: opts, scope: scope, log: opts.InstrumentOptions().Logger(), + nowFn: opts.ClockOptions().NowFn(), status: scope.Gauge("bootstrapped"), durableStatus: scope.Gauge("bootstrapped-durable"), numRetries: scope.Counter("bootstrap-retries"), From dce0daa9b6619f6a4a542dc8d685599c88b3311c Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Mon, 11 Jan 2021 14:09:51 +0200 Subject: [PATCH 09/10] added methods for creating instrumentation contexts. --- src/dbnode/storage/bootstrap.go | 2 +- .../peers/source_instrumentation.go | 52 ++++++++++++++----- .../storage/bootstrap_instrumentation.go | 27 ++++++---- 3 files changed, 58 insertions(+), 23 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index 92ed6f1afb..ecefb7a288 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -97,7 +97,7 @@ func newBootstrapManager( processProvider: opts.BootstrapProcessProvider(), sleepFn: time.Sleep, nowFn: opts.ClockOptions().NowFn(), - instrumentation: newInstrumentation(opts), + instrumentation: newBootstrapInstrumentation(opts), } m.bootstrapFn = m.bootstrap return m diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go index d2e29c35f2..069861f3d1 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go @@ -45,6 +45,21 @@ type instrumentationContext struct { bootstrapIndexDuration tally.Timer } +func newInstrumentationContext( + nowFn clock.NowFn, + log *zap.Logger, + span opentracing.Span, + scope tally.Scope, +) *instrumentationContext { + return &instrumentationContext{ + nowFn: nowFn, + log: log, + span: span, + bootstrapDataDuration: scope.Timer("data-duration"), + bootstrapIndexDuration: scope.Timer("index-duration"), + } +} + func (i *instrumentationContext) finish() { i.span.Finish() } @@ -87,6 +102,19 @@ type instrumentationReadShardsContext struct { bootstrapShardsDuration tally.Timer } +func newInstrumentationReadShardsContext( + nowFn clock.NowFn, + log *zap.Logger, + scope tally.Scope, +) *instrumentationReadShardsContext { + return &instrumentationReadShardsContext{ + nowFn: nowFn, + log: log, + start: nowFn(), + bootstrapShardsDuration: scope.Timer("shards-duration"), + } +} + func (i *instrumentationReadShardsContext) bootstrapShardsCompleted() { duration := i.nowFn().Sub(i.start) i.bootstrapShardsDuration.Record(duration) @@ -121,13 +149,12 @@ func (i *instrumentation) peersBootstrapperSourceReadStarted( ctx context.Context, ) *instrumentationContext { _, span, _ := ctx.StartSampledTraceSpan(tracepoint.BootstrapperPeersSourceRead) - return &instrumentationContext{ - nowFn: i.nowFn, - log: i.log, - span: span, - bootstrapDataDuration: i.scope.Timer("data-duration"), - bootstrapIndexDuration: i.scope.Timer("index-duration"), - } + return newInstrumentationContext( + i.nowFn, + i.log, + span, + i.scope, + ) } func (i *instrumentation) getDefaultAdminSessionFailed(err error) { @@ -143,12 +170,11 @@ func (i *instrumentation) bootstrapShardsStarted( zap.Int("shards", count), zap.Int("concurrency", concurrency), zap.Bool("shouldPersist", shouldPersist)) - return &instrumentationReadShardsContext{ - nowFn: i.nowFn, - log: i.log, - start: i.nowFn(), - bootstrapShardsDuration: i.scope.Timer("shards-duration"), - } + return newInstrumentationReadShardsContext( + i.nowFn, + i.log, + i.scope, + ) } func (i *instrumentation) persistenceFlushFailed(err error) { diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go index d004712de3..a1e2eae7f4 100644 --- a/src/dbnode/storage/bootstrap_instrumentation.go +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -42,6 +42,22 @@ type instrumentationContext struct { instrumentOptions instrument.Options } +func newInstrumentationContext( + nowFn clock.NowFn, + log *zap.Logger, + scope tally.Scope, + opts Options, +) *instrumentationContext { + return &instrumentationContext{ + start: nowFn(), + log: log, + nowFn: nowFn, + bootstrapDuration: scope.Timer("bootstrap-duration"), + bootstrapNamespacesDuration: scope.Timer("bootstrap-namespaces-duration"), + instrumentOptions: opts.InstrumentOptions(), + } +} + func (i *instrumentationContext) bootstrapStarted(shards int) { i.logFields = append(i.logFields, zap.Int("numShards", shards)) i.log.Info("bootstrap started", i.logFields...) @@ -105,7 +121,7 @@ type bootstrapInstrumentation struct { numRetries tally.Counter } -func newInstrumentation(opts Options) *bootstrapInstrumentation { +func newBootstrapInstrumentation(opts Options) *bootstrapInstrumentation { scope := opts.InstrumentOptions().MetricsScope() return &bootstrapInstrumentation{ opts: opts, @@ -120,14 +136,7 @@ func newInstrumentation(opts Options) *bootstrapInstrumentation { func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext { i.log.Info("bootstrap prepare") - return &instrumentationContext{ - start: i.nowFn(), - log: i.log, - nowFn: i.nowFn, - bootstrapDuration: i.scope.Timer("bootstrap-duration"), - bootstrapNamespacesDuration: i.scope.Timer("bootstrap-namespaces-duration"), - instrumentOptions: i.opts.InstrumentOptions(), - } + return newInstrumentationContext(i.nowFn, i.log, i.scope, i.opts) } func (i *bootstrapInstrumentation) bootstrapFnFailed(retry int) { From e7653754361a54f1b526edeac20cba7a467fa9d4 Mon Sep 17 00:00:00 2001 From: Linas Naginionis Date: Wed, 13 Jan 2021 15:15:37 +0200 Subject: [PATCH 10/10] changes after code review --- src/dbnode/storage/bootstrap.go | 9 +- .../bootstrap/bootstrapper/peers/source.go | 60 +++++++++----- .../peers/source_instrumentation.go | 83 ------------------- .../storage/bootstrap_instrumentation.go | 8 +- 4 files changed, 45 insertions(+), 115 deletions(-) diff --git a/src/dbnode/storage/bootstrap.go b/src/dbnode/storage/bootstrap.go index ecefb7a288..8481389cf9 100644 --- a/src/dbnode/storage/bootstrap.go +++ b/src/dbnode/storage/bootstrap.go @@ -30,7 +30,6 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" xerrors "github.com/m3db/m3/src/x/errors" - "github.com/m3db/m3/src/x/instrument" xtime "github.com/m3db/m3/src/x/time" ) @@ -168,7 +167,7 @@ func (m *bootstrapManager) Bootstrap() (BootstrapResult, error) { // NB(r): Last bootstrap failed, since this could be due to transient // failure we retry the bootstrap again. This is to avoid operators // needing to manually intervene for cases where failures are transient. - m.instrumentation.bootstrapFnFailed(i + 1) + m.instrumentation.bootstrapFailed(i + 1) m.sleepFn(bootstrapRetryInterval) continue } @@ -221,8 +220,7 @@ func (m *bootstrapManager) bootstrap() error { // an error returned. for _, accumulator := range accmulators { if err := accumulator.Close(); err != nil { - instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions, - instrCtx.logFn(err, "could not close bootstrap data accumulator")) + instrCtx.emitAndLogInvariantViolation(err, "could not close bootstrap data accumulator") } } }() @@ -321,8 +319,7 @@ func (m *bootstrapManager) bootstrap() error { if !ok { err := fmt.Errorf("missing namespace from bootstrap result: %v", id.String()) - instrument.EmitAndLogInvariantViolation(instrCtx.instrumentOptions, - instrCtx.logFn(err, "bootstrap failed")) + instrCtx.emitAndLogInvariantViolation(err, "bootstrap failed") return err } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go index bf6afdf2f5..d451dca727 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source.go @@ -58,6 +58,7 @@ import ( type peersSource struct { opts Options newPersistManager func() (persist.Manager, error) + log *zap.Logger instrumentation *instrumentation } @@ -73,12 +74,14 @@ func newPeersSource(opts Options) (bootstrap.Source, error) { return nil, err } + instrumentation := newInstrumentation(opts) return &peersSource{ opts: opts, newPersistManager: func() (persist.Manager, error) { return fs.NewPersistManager(opts.FilesystemOptions()) }, - instrumentation: newInstrumentation(opts), + log: instrumentation.log, + instrumentation: instrumentation, }, nil } @@ -170,7 +173,8 @@ func (s *peersSource) Read( namespace := elem.Value() md := namespace.Metadata if !md.Options().IndexOptions().Enabled() { - instrCtx.bootstrapIndexSkipped(md.ID()) + s.log.Info("skipping bootstrap for namespace based on options", + zap.Stringer("namespace", md.ID())) continue } @@ -230,7 +234,7 @@ func (s *peersSource) readData( result := result.NewDataBootstrapResult() session, err := s.opts.AdminClient().DefaultAdminSession() if err != nil { - s.instrumentation.getDefaultAdminSessionFailed(err) + s.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) result.SetUnfulfilled(shardTimeRanges) return nil, err } @@ -251,6 +255,7 @@ func (s *peersSource) readData( } instrCtx := s.instrumentation.bootstrapShardsStarted(count, concurrency, shouldPersist) + defer instrCtx.bootstrapShardsCompleted() if shouldPersist { // Spin up persist workers. for i := 0; i < s.opts.ShardPersistenceFlushConcurrency(); i++ { @@ -294,7 +299,6 @@ func (s *peersSource) readData( } } - instrCtx.bootstrapShardsCompleted() return result, nil } @@ -353,7 +357,8 @@ func (s *peersSource) runPersistenceQueueWorkerLoop( } // Remove results and make unfulfilled if an error occurred. - s.instrumentation.persistenceFlushFailed(err) + s.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", + zap.Error(err)) // Make unfulfilled. lock.Lock() @@ -431,14 +436,14 @@ func (s *peersSource) fetchBootstrapBlocksFromPeers( continue } unfulfill(currRange) - s.instrumentation.seriesCheckoutFailed(err) + s.log.Error("could not checkout series", zap.Error(err)) continue } for _, block := range entry.Blocks.AllBlocks() { if err := ref.Series.LoadBlock(block, series.WarmWrite); err != nil { unfulfill(currRange) - s.instrumentation.seriesLoadFailed(err) + s.log.Error("could not load series block", zap.Error(err)) } } @@ -456,7 +461,10 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( err error, ) { if err != nil { - s.instrumentation.fetchBootstrapBlocksFailed(err, shard) + s.log.Error("error fetching bootstrap blocks", + zap.Uint32("shard", shard), + zap.Error(err), + ) return } @@ -469,7 +477,11 @@ func (s *peersSource) logFetchBootstrapBlocksFromPeersOutcome( } for block, numSeries := range shardBlockSeriesCounter { - s.instrumentation.shardBootstrapped(shard, numSeries, block.ToTime()) + s.log.Info("peer bootstrapped shard", + zap.Uint32("shard", shard), + zap.Int64("numSeries", numSeries), + zap.Time("blockStart", block.ToTime()), + ) } } @@ -681,7 +693,8 @@ func (s *peersSource) readIndex( indexSegmentConcurrency = s.opts.IndexSegmentConcurrency() readersCh = make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency) ) - s.instrumentation.peersBootstrapperIndexForRanges(count) + s.log.Info("peers bootstrapper bootstrapping index for ranges", + zap.Int("shards", count)) go bootstrapper.EnqueueReaders(bootstrapper.EnqueueReadersOptions{ NsMD: ns, @@ -861,7 +874,8 @@ func (s *peersSource) processReaders( xtime.NewRanges(timeRange), )) } else { - s.instrumentation.processingReadersFailed(err, start) + s.log.Error("error processing readers", zap.Error(err), + zap.Time("timeRange.start", start)) timesWithErrors = append(timesWithErrors, timeRange.Start) } } @@ -905,7 +919,7 @@ func (s *peersSource) processReaders( zap.String("remainingRanges", remainingRanges.SummaryString()), } if shouldPersist { - s.instrumentation.buildingFileSetIndexSegmentStarted(buildIndexLogFields) + s.log.Debug("building file set index segment", buildIndexLogFields...) indexBlock, err = bootstrapper.PersistBootstrapIndexSegment( ns, requestedRanges, @@ -932,7 +946,7 @@ func (s *peersSource) processReaders( }) } } else { - s.instrumentation.buildingInMemoryIndexSegmentStarted(buildIndexLogFields) + s.log.Info("building in-memory index segment", buildIndexLogFields...) indexBlock, err = bootstrapper.BuildBootstrapIndexSegment( ns, requestedRanges, @@ -1019,9 +1033,9 @@ func (s *peersSource) markRunResultErrorsAndUnfulfilled( for i := range timesWithErrors { timesWithErrorsString[i] = timesWithErrors[i].String() } - s.instrumentation.errorsForRangeEncountered( - remainingRanges.SummaryString(), - timesWithErrorsString) + s.log.Info("encountered errors for range", + zap.String("requestedRanges", remainingRanges.SummaryString()), + zap.Strings("timesWithErrors", timesWithErrorsString)) } if !remainingRanges.IsEmpty() { @@ -1099,17 +1113,19 @@ func (s *peersSource) peerAvailability( if available == 0 { // Can't peer bootstrap if there are no available peers. - s.instrumentation.noPeersAvailable(total, shardIDUint) + s.log.Debug("0 available peers, unable to peer bootstrap", + zap.Int("total", total), + zap.Uint32("shard", shardIDUint)) continue } if !topology.ReadConsistencyAchieved( bootstrapConsistencyLevel, majorityReplicas, total, available) { - s.instrumentation.readConsistencyNotAchieved( - bootstrapConsistencyLevel, - majorityReplicas, - total, - available) + s.log.Debug("read consistency not achieved, unable to peer bootstrap", + zap.Any("level", bootstrapConsistencyLevel), + zap.Int("replicas", majorityReplicas), + zap.Int("total", total), + zap.Int("available", available)) continue } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go index 069861f3d1..72d2cd12bd 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/peers/source_instrumentation.go @@ -29,11 +29,9 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" - "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" - "github.com/m3db/m3/src/x/ident" ) type instrumentationContext struct { @@ -83,11 +81,6 @@ func (i *instrumentationContext) bootstrapIndexStarted() { i.start = i.nowFn() } -func (i *instrumentationContext) bootstrapIndexSkipped(namespaceID ident.ID) { - i.log.Info("skipping bootstrap for namespace based on options", - zap.Stringer("namespace", namespaceID)) -} - func (i *instrumentationContext) bootstrapIndexCompleted() { duration := i.nowFn().Sub(i.start) i.bootstrapIndexDuration.Record(duration) @@ -157,10 +150,6 @@ func (i *instrumentation) peersBootstrapperSourceReadStarted( ) } -func (i *instrumentation) getDefaultAdminSessionFailed(err error) { - i.log.Error("peers bootstrapper cannot get default admin session", zap.Error(err)) -} - func (i *instrumentation) bootstrapShardsStarted( count int, concurrency int, @@ -177,79 +166,7 @@ func (i *instrumentation) bootstrapShardsStarted( ) } -func (i *instrumentation) persistenceFlushFailed(err error) { - i.log.Error("peers bootstrapper bootstrap with persistence flush encountered error", - zap.Error(err)) -} - -func (i *instrumentation) seriesCheckoutFailed(err error) { - i.log.Error("could not checkout series", zap.Error(err)) -} - -func (i *instrumentation) seriesLoadFailed(err error) { - i.log.Error("could not load series block", zap.Error(err)) -} - -func (i *instrumentation) shardBootstrapped(shard uint32, numSeries int64, blockTime time.Time) { - i.log.Info("peer bootstrapped shard", - zap.Uint32("shard", shard), - zap.Int64("numSeries", numSeries), - zap.Time("blockStart", blockTime), - ) -} - -func (i *instrumentation) fetchBootstrapBlocksFailed(err error, shard uint32) { - i.log.Error("error fetching bootstrap blocks", - zap.Uint32("shard", shard), - zap.Error(err), - ) -} - -func (i *instrumentation) peersBootstrapperIndexForRanges(count int) { - i.log.Info("peers bootstrapper bootstrapping index for ranges", - zap.Int("shards", count), - ) -} - -func (i *instrumentation) processingReadersFailed(err error, start time.Time) { - i.log.Error("error processing readers", zap.Error(err), - zap.Time("timeRange.start", start)) -} - -func (i *instrumentation) buildingFileSetIndexSegmentStarted(fields []zapcore.Field) { - i.log.Debug("building file set index segment", fields...) -} - func (i *instrumentation) outOfRetentionIndexSegmentSkipped(fields []zapcore.Field) { i.log.Debug("skipping out of retention index segment", fields...) i.persistedIndexBlocksOutOfRetention.Inc(1) } - -func (i *instrumentation) buildingInMemoryIndexSegmentStarted(fields []zapcore.Field) { - i.log.Info("building in-memory index segment", fields...) -} - -func (i *instrumentation) errorsForRangeEncountered(summaryString string, errorsString []string) { - i.log.Info("encountered errors for range", - zap.String("requestedRanges", summaryString), - zap.Strings("timesWithErrors", errorsString)) -} - -func (i *instrumentation) noPeersAvailable(total int, shardIDUint uint32) { - i.log.Debug("0 available peers, unable to peer bootstrap", - zap.Int("total", total), - zap.Uint32("shard", shardIDUint)) -} - -func (i *instrumentation) readConsistencyNotAchieved( - bootstrapConsistencyLevel topology.ReadConsistencyLevel, - majorityReplicas int, - total int, - available int, -) { - i.log.Debug("read consistency not achieved, unable to peer bootstrap", - zap.Any("level", bootstrapConsistencyLevel), - zap.Int("replicas", majorityReplicas), - zap.Int("total", total), - zap.Int("available", available)) -} diff --git a/src/dbnode/storage/bootstrap_instrumentation.go b/src/dbnode/storage/bootstrap_instrumentation.go index a1e2eae7f4..37e593fd92 100644 --- a/src/dbnode/storage/bootstrap_instrumentation.go +++ b/src/dbnode/storage/bootstrap_instrumentation.go @@ -105,10 +105,10 @@ func (i *instrumentationContext) bootstrapNamespacesFailed(err error) { i.log.Info("bootstrap namespaces failed", append(i.logFields, zap.Error(err))...) } -func (i *instrumentationContext) logFn(err error, msg string) func(l *zap.Logger) { - return func(l *zap.Logger) { +func (i *instrumentationContext) emitAndLogInvariantViolation(err error, msg string) { + instrument.EmitAndLogInvariantViolation(i.instrumentOptions, func(l *zap.Logger) { l.Error(msg, append(i.logFields, zap.Error(err))...) - } + }) } type bootstrapInstrumentation struct { @@ -139,7 +139,7 @@ func (i *bootstrapInstrumentation) bootstrapPreparing() *instrumentationContext return newInstrumentationContext(i.nowFn, i.log, i.scope, i.opts) } -func (i *bootstrapInstrumentation) bootstrapFnFailed(retry int) { +func (i *bootstrapInstrumentation) bootstrapFailed(retry int) { i.numRetries.Inc(1) i.log.Warn("retrying bootstrap after backoff", zap.Duration("backoff", bootstrapRetryInterval),