diff --git a/src/dbnode/persist/fs/index_read.go b/src/dbnode/persist/fs/index_read.go index 773c7b5291..5b93a34225 100644 --- a/src/dbnode/persist/fs/index_read.go +++ b/src/dbnode/persist/fs/index_read.go @@ -22,6 +22,7 @@ package fs import ( "bytes" + "errors" "fmt" "io" "io/ioutil" @@ -41,6 +42,9 @@ const ( mmapPersistFsIndexName = "mmap.persist.fs.index" ) +// ErrIndexReaderValidationFailed is returned for corrupt index segemnts. +var ErrIndexReaderValidationFailed = errors.New("validation failed") + type indexReader struct { opts Options filePathPrefix string @@ -305,16 +309,16 @@ func (r *indexReader) Validate() error { func (r *indexReader) validateDigestsFileDigest() error { if r.readDigests.digestsFileDigest != r.expectedDigestOfDigest { - return fmt.Errorf("read digests file checksum bad: expected=%d, actual=%d", - r.expectedDigestOfDigest, r.readDigests.digestsFileDigest) + return fmt.Errorf("(%w) read digests file checksum bad: expected=%d, actual=%d", + ErrIndexReaderValidationFailed, r.expectedDigestOfDigest, r.readDigests.digestsFileDigest) } return nil } func (r *indexReader) validateInfoFileDigest() error { if r.readDigests.infoFileDigest != r.expectedDigest.InfoDigest { - return fmt.Errorf("read info file checksum bad: expected=%d, actual=%d", - r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest) + return fmt.Errorf("(%w) read info file checksum bad: expected=%d, actual=%d", + ErrIndexReaderValidationFailed, r.expectedDigest.InfoDigest, r.readDigests.infoFileDigest) } return nil } @@ -322,35 +326,37 @@ func (r *indexReader) validateInfoFileDigest() error { func (r *indexReader) validateSegmentFileDigest(segmentIdx, fileIdx int) error { if segmentIdx >= len(r.readDigests.segments) { return fmt.Errorf( - "have not read correct number of segments to validate segment %d checksums: "+ + "(%w) have not read correct number of segments to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, segmentIdx+1, len(r.readDigests.segments)) + ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.readDigests.segments)) } if segmentIdx >= len(r.expectedDigest.SegmentDigests) { return fmt.Errorf( - "have not read digest files correctly to validate segment %d checksums: "+ + "(%w) have not read digest files correctly to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests)) + ErrIndexReaderValidationFailed, segmentIdx, segmentIdx+1, len(r.expectedDigest.SegmentDigests)) } if fileIdx >= len(r.readDigests.segments[segmentIdx].files) { return fmt.Errorf( - "have not read correct number of segment files to validate segment %d checksums: "+ + "(%w) have not read correct number of segment files to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, fileIdx+1, len(r.readDigests.segments[segmentIdx].files)) + ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1, + len(r.readDigests.segments[segmentIdx].files)) } if fileIdx >= len(r.expectedDigest.SegmentDigests[segmentIdx].Files) { return fmt.Errorf( - "have not read correct number of segment files to validate segment %d checksums: "+ + "(%w) have not read correct number of segment files to validate segment %d checksums: "+ "need=%d, actual=%d", - segmentIdx, fileIdx+1, len(r.expectedDigest.SegmentDigests[segmentIdx].Files)) + ErrIndexReaderValidationFailed, segmentIdx, fileIdx+1, + len(r.expectedDigest.SegmentDigests[segmentIdx].Files)) } expected := r.expectedDigest.SegmentDigests[segmentIdx].Files[fileIdx].Digest actual := r.readDigests.segments[segmentIdx].files[fileIdx].digest if actual != expected { - return fmt.Errorf("read segment file %d for segment %d checksum bad: expected=%d, actual=%d", - segmentIdx, fileIdx, expected, actual) + return fmt.Errorf("(%w) read segment file %d for segment %d checksum bad: expected=%d, actual=%d", + ErrIndexReaderValidationFailed, segmentIdx, fileIdx, expected, actual) } return nil } diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 4b60e80b70..ff17eb8db0 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -123,7 +123,7 @@ type DataFileSetReaderStatus struct { // DataReaderOpenOptions is options struct for the reader open method. type DataReaderOpenOptions struct { // Identifier allows to identify a FileSetFile. - Identifier FileSetFileIdentifier + Identifier FileSetFileIdentifier // FileSetType is the file set type. FileSetType persist.FileSetType // StreamingEnabled enables using streaming methods, such as DataFileSetReader.StreamingRead. @@ -680,3 +680,15 @@ type CrossBlockIterator interface { // Reset resets the iterator to the given block records. Reset(records []BlockRecord) } + +// IndexClaimsManager manages concurrent claims to volume indices per ns and block start. +// This allows multiple threads to safely increment the volume index. +type IndexClaimsManager interface { + ClaimNextIndexFileSetVolumeIndex( + md namespace.Metadata, + blockStart time.Time, + ) (int, error) +} + +// DeleteFilesFn deletes files passed in as arg. +type DeleteFilesFn func(files []string) error diff --git a/src/dbnode/storage/bootstrap/bootstrap_mock.go b/src/dbnode/storage/bootstrap/bootstrap_mock.go index c5a420d702..7c5c256aef 100644 --- a/src/dbnode/storage/bootstrap/bootstrap_mock.go +++ b/src/dbnode/storage/bootstrap/bootstrap_mock.go @@ -700,6 +700,21 @@ func (mr *MockCacheMockRecorder) InfoFilesForShard(ns, shard interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InfoFilesForShard", reflect.TypeOf((*MockCache)(nil).InfoFilesForShard), ns, shard) } +// IndexInfoFilesForNamespace mocks base method +func (m *MockCache) IndexInfoFilesForNamespace(ns namespace.Metadata) ([]fs.ReadIndexInfoFileResult, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IndexInfoFilesForNamespace", ns) + ret0, _ := ret[0].([]fs.ReadIndexInfoFileResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// IndexInfoFilesForNamespace indicates an expected call of IndexInfoFilesForNamespace +func (mr *MockCacheMockRecorder) IndexInfoFilesForNamespace(ns interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IndexInfoFilesForNamespace", reflect.TypeOf((*MockCache)(nil).IndexInfoFilesForNamespace), ns) +} + // ReadInfoFiles mocks base method func (m *MockCache) ReadInfoFiles() InfoFilesByNamespace { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/bootstrap/bootstrapper/README.md b/src/dbnode/storage/bootstrap/bootstrapper/README.md index b062aaac3d..947ea7f10a 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/README.md +++ b/src/dbnode/storage/bootstrap/bootstrapper/README.md @@ -6,6 +6,7 @@ The collection of bootstrappers comprise the task executed when bootstrapping a - `fs`: The filesystem bootstrapper, used to bootstrap as much data as possible from the local filesystem. - `peers`: The peers bootstrapper, used to bootstrap any remaining data from peers. This is used for a full node join too. + - *NOTE*: For the node leave case, the peers bs will persist default volume type index filesets to disk with non-overlapping shard time ranges to avoid re-building the entire index segment w/ new shards. - `commitlog`: The commit log bootstrapper, currently only used in the case that peers bootstrapping fails. Once the current block is being snapshotted frequently to disk it might be faster and make more sense to not actively use the peers bootstrapper and just use a combination of the filesystem bootstrapper and the minimal time range required from the commit log bootstrapper. - *NOTE*: the commitlog bootstrapper is special cased in that it runs for the *entire* bootstrappable range per shard whereas other bootstrappers fill in the unfulfilled gaps as bootstrapping progresses. diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go index bfc9aea8a5..b32efd0636 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source.go @@ -21,11 +21,13 @@ package fs import ( + "errors" "fmt" "sync" "time" - "github.com/m3db/m3/src/dbnode/clock" + dbnodeclock "github.com/m3db/m3/src/dbnode/clock" + indexpb "github.com/m3db/m3/src/dbnode/generated/proto/index" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" @@ -37,14 +39,18 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs/migrator" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/dbnode/storage/index" + "github.com/m3db/m3/src/dbnode/storage/index/compaction" "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/tracepoint" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/checked" + "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" @@ -77,14 +83,14 @@ type fileSystemSource struct { idPool ident.Pool newReaderFn newDataFileSetReaderFn newReaderPoolOpts bootstrapper.NewReaderPoolOptions - persistManager *bootstrapper.SharedPersistManager - compactor *bootstrapper.SharedCompactor + deleteFilesFn fs.DeleteFilesFn metrics fileSystemSourceMetrics } type fileSystemSourceMetrics struct { - persistedIndexBlocksRead tally.Counter - persistedIndexBlocksWrite tally.Counter + persistedIndexBlocksRead tally.Counter + persistedIndexBlocksWrite tally.Counter + indexBlocksFailedValidation tally.Counter } func newFileSystemSource(opts Options) (bootstrap.Source, error) { @@ -98,21 +104,17 @@ func newFileSystemSource(opts Options) (bootstrap.Source, error) { opts = opts.SetInstrumentOptions(iopts) s := &fileSystemSource{ - opts: opts, - fsopts: opts.FilesystemOptions(), - log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")), - nowFn: opts.ResultOptions().ClockOptions().NowFn(), - idPool: opts.IdentifierPool(), - newReaderFn: fs.NewReader, - persistManager: &bootstrapper.SharedPersistManager{ - Mgr: opts.PersistManager(), - }, - compactor: &bootstrapper.SharedCompactor{ - Compactor: opts.Compactor(), - }, + opts: opts, + fsopts: opts.FilesystemOptions(), + log: iopts.Logger().With(zap.String("bootstrapper", "filesystem")), + nowFn: clock.NowFn(opts.ResultOptions().ClockOptions().NowFn()), + idPool: opts.IdentifierPool(), + newReaderFn: fs.NewReader, + deleteFilesFn: fs.DeleteFiles, metrics: fileSystemSourceMetrics{ - persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"), - persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"), + persistedIndexBlocksRead: scope.Counter("persist-index-blocks-read"), + persistedIndexBlocksWrite: scope.Counter("persist-index-blocks-write"), + indexBlocksFailedValidation: scope.Counter("index-blocks-failed-validation"), }, } s.newReaderPoolOpts.Alloc = s.newReader @@ -328,14 +330,15 @@ func (s *fileSystemSource) bootstrapFromReaders( ns namespace.Metadata, accumulator bootstrap.NamespaceDataAccumulator, runOpts bootstrap.RunOptions, + runResult *runResult, readerPool *bootstrapper.ReaderPool, readersCh <-chan bootstrapper.TimeWindowReaders, builder *result.IndexBuilder, -) *runResult { - var ( - runResult = newRunResult() - resultOpts = s.opts.ResultOptions() - ) + persistManager *bootstrapper.SharedPersistManager, + compactor *bootstrapper.SharedCompactor, + cache bootstrap.Cache, +) { + resultOpts := s.opts.ResultOptions() for timeWindowReaders := range readersCh { // NB(bodu): Since we are re-using the same builder for all bootstrapped index blocks, @@ -343,10 +346,9 @@ func (s *fileSystemSource) bootstrapFromReaders( builder.Builder().Reset() s.loadShardReadersDataIntoShardResult(run, ns, accumulator, - runOpts, runResult, resultOpts, timeWindowReaders, readerPool, builder) + runOpts, runResult, resultOpts, timeWindowReaders, readerPool, + builder, persistManager, compactor, cache) } - - return runResult } // markRunResultErrorsAndUnfulfilled checks the list of times that had errors and makes @@ -397,6 +399,9 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( timeWindowReaders bootstrapper.TimeWindowReaders, readerPool *bootstrapper.ReaderPool, builder *result.IndexBuilder, + persistManager *bootstrapper.SharedPersistManager, + compactor *bootstrapper.SharedCompactor, + cache bootstrap.Cache, ) { var ( blockPool = ropts.DatabaseBlockOptions().DatabaseBlockPool() @@ -486,9 +491,11 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( if err == nil && run == bootstrapIndexRunType { // Mark index block as fulfilled. fulfilled := result.NewShardTimeRanges().Set(shard, xtime.NewRanges(timeRange)) + runResult.Lock() err = runResult.index.IndexResults().MarkFulfilled(start, fulfilled, // NB(bodu): By default, we always load bootstrapped data into the default index volume. idxpersist.DefaultIndexVolumeType, ns.Options().IndexOptions()) + runResult.Unlock() if err != nil { s.log.Error("indexResults MarkFulfilled failed", zap.Error(err), zap.Time("timeRangeStart", timeRange.Start)) @@ -542,7 +549,9 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( fulfilledMin, fulfilledMax := totalFulfilledRanges.MinMax() // NB(bodu): Assume if we're bootstrapping data from disk that it is the "default" index volume type. + runResult.Lock() existingIndexBlock, ok := bootstrapper.GetDefaultIndexBlockForBlockStart(runResult.index.IndexResults(), blockStart) + runResult.Unlock() if !ok { err := fmt.Errorf("could not find index block in results: time=%s, ts=%d", blockStart.String(), blockStart.UnixNano()) @@ -579,6 +588,26 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( } if shouldFlush && satisifiedFlushRanges { + // NB(bodu): If we are persisting an index segment to disk, we need to delete any existing + // index filesets at this block start. The newly persisted index segments becomes the new source of truth. + var ( + filesToDelete = []string{} + persistSuccess bool + ) + defer func() { + if persistSuccess { + if err := s.deleteFilesFn(filesToDelete); err != nil { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("failed to delete non default index filesets", + zap.Error(err), + zap.Stringer("namespace", ns.ID()), + zap.Stringer("requestedRanges", requestedRanges)) + }) + } + } + }() + filesToDelete = s.appendIndexFilesetFilesToDelete(ns, blockStart, cache, filesToDelete, runResult, iopts) + // Use debug level with full log fidelity. s.log.Debug("building file set index segment", buildIndexLogFields...) // Use info log with more high level attributes. @@ -587,17 +616,23 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( zap.Int("totalEntries", totalEntries), zap.Time("blockStart", blockStart), zap.Time("blockEnd", blockEnd)) + // NB(bodu): The index claims manager ensures that we properly advance the volume index + // past existing volume indices. indexBlock, err = bootstrapper.PersistBootstrapIndexSegment( ns, requestedRanges, builder.Builder(), - s.persistManager, + persistManager, s.opts.ResultOptions(), existingIndexBlock.Fulfilled(), blockStart, blockEnd, ) - if err != nil { + if err == nil { + // Track success. + s.metrics.persistedIndexBlocksWrite.Inc(1) + persistSuccess = true + } else { instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { l.Error("persist fs index bootstrap failed", zap.Error(err), @@ -605,15 +640,13 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( zap.Stringer("requestedRanges", requestedRanges)) }) } - // Track success. - s.metrics.persistedIndexBlocksWrite.Inc(1) } else { s.log.Info("building in-memory index segment", buildIndexLogFields...) indexBlock, err = bootstrapper.BuildBootstrapIndexSegment( ns, requestedRanges, builder.Builder(), - s.compactor, + compactor, s.opts.ResultOptions(), s.opts.FilesystemOptions().MmapReporter(), blockStart, @@ -639,7 +672,9 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( newFulfilled.AddRanges(indexBlock.Fulfilled()) // Replace index block for default index volume type. + runResult.Lock() runResult.index.IndexResults()[xtime.ToUnixNano(blockStart)].SetBlock(idxpersist.DefaultIndexVolumeType, result.NewIndexBlock(segments, newFulfilled)) + runResult.Unlock() } // Return readers to pool. @@ -655,6 +690,74 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult( remainingRanges, timesWithErrors) } +// appendIndexFilesetFilesToDelete appends all index filesets at a block start for deletion. +// Also removes index results from given block start since results are not complete and/or corrupted +// and require an index segment rebuild. +func (s *fileSystemSource) appendIndexFilesetFilesToDelete( + ns namespace.Metadata, + blockStart time.Time, + cache bootstrap.Cache, + filesToDelete []string, + runResult *runResult, + iopts instrument.Options, +) []string { + infoFiles, err := cache.IndexInfoFilesForNamespace(ns) + if err != nil { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("failed to get index info files from cache", + zap.Error(err), + zap.Time("blockStart", blockStart), + zap.Stringer("namespace", ns.ID())) + }) + } + for i := range infoFiles { + if err := infoFiles[i].Err.Error(); err != nil { + // We already log errors once when bootstrapping from persisted + // index blocks just continue here. + continue + } + + info := infoFiles[i].Info + indexBlockStart := xtime.UnixNano(info.BlockStart).ToTime() + if blockStart.Equal(indexBlockStart) { + filesToDelete = append(filesToDelete, infoFiles[i].AbsoluteFilePaths...) + } + } + + // Remove index results for the block we're deleting. + if err := removeIndexResults(ns, blockStart, runResult); err != nil { + instrument.EmitAndLogInvariantViolation(iopts, func(l *zap.Logger) { + l.Error("error removing partial/corrupted index results", + zap.Error(err), + zap.Time("blockStart", blockStart), + zap.Stringer("namespace", ns.ID())) + }) + } + + return filesToDelete +} + +func removeIndexResults( + ns namespace.Metadata, + blockStart time.Time, + runResult *runResult, +) error { + runResult.Lock() + defer runResult.Unlock() + + multiErr := xerrors.NewMultiError() + results, ok := runResult.index.IndexResults()[xtime.ToUnixNano(blockStart)] + if ok { + for volumeType, indexBlock := range results.Iter() { + for _, seg := range indexBlock.Segments() { + multiErr = multiErr.Add(seg.Segment().Close()) + } + results.DeleteBlock(volumeType) + } + } + return multiErr.FinalError() +} + func (s *fileSystemSource) readNextEntryAndRecordBlock( nsCtx namespace.Context, accumulator bootstrap.NamespaceDataAccumulator, @@ -752,21 +855,17 @@ func (s *fileSystemSource) read( ) (*runResult, error) { var ( seriesCachePolicy = s.opts.ResultOptions().SeriesCachePolicy() - res *runResult + res = newRunResult() ) if shardTimeRanges.IsEmpty() { return newRunResult(), nil } - setOrMergeResult := func(newResult *runResult) { + mergeResult := func(newResult *runResult) { if newResult == nil { return } - if res == nil { - res = newResult - } else { - res = res.mergedResult(newResult) - } + res = res.mergedResult(newResult) } if run == bootstrapDataRunType { @@ -790,7 +889,7 @@ func (s *fileSystemSource) read( // subtract the shard + time ranges from what we intend to bootstrap // for those we found. r, err := s.bootstrapFromIndexPersistedBlocks(md, - shardTimeRanges) + shardTimeRanges, cache) if err != nil { s.log.Warn("filesystem bootstrapped failed to read persisted index blocks") } else { @@ -798,7 +897,7 @@ func (s *fileSystemSource) read( shardTimeRanges = shardTimeRanges.Copy() shardTimeRanges.Subtract(r.fulfilled) // Set or merge result. - setOrMergeResult(r.result) + mergeResult(r.result) } logSpan("bootstrap_from_index_persisted_blocks_done") } @@ -807,7 +906,8 @@ func (s *fileSystemSource) read( // allocate and keep around readers outside of the bootstrapping process, // hence why its created on demand each time. readerPool := bootstrapper.NewReaderPool(s.newReaderPoolOpts) - readersCh := make(chan bootstrapper.TimeWindowReaders) + indexSegmentConcurrency := 1 + readersCh := make(chan bootstrapper.TimeWindowReaders, indexSegmentConcurrency) var blockSize time.Duration switch run { case bootstrapDataRunType: @@ -832,14 +932,55 @@ func (s *fileSystemSource) read( OptimizedReadMetadataOnly: run == bootstrapIndexRunType, Logger: s.log, Span: span, - NowFn: s.nowFn, + NowFn: dbnodeclock.NowFn(s.nowFn), Cache: cache, }) - bootstrapFromDataReadersResult := s.bootstrapFromReaders(run, md, - accumulator, runOpts, readerPool, readersCh, builder) - // Merge any existing results if necessary. - setOrMergeResult(bootstrapFromDataReadersResult) + var buildWg sync.WaitGroup + for i := 0; i < indexSegmentConcurrency; i++ { + alloc := s.opts.ResultOptions().IndexDocumentsBuilderAllocator() + segBuilder, err := alloc() + if err != nil { + return nil, err + } + + builder := result.NewIndexBuilder(segBuilder) + + indexOpts := s.opts.IndexOptions() + compactor, err := compaction.NewCompactor(indexOpts.DocumentArrayPool(), + index.DocumentArrayPoolCapacity, + indexOpts.SegmentBuilderOptions(), + indexOpts.FSTSegmentOptions(), + compaction.CompactorOptions{ + FSTWriterOptions: &fst.WriterOptions{ + // DisableRegistry is set to true to trade a larger FST size + // for a faster FST compaction since we want to reduce the end + // to end latency for time to first index a metric. + DisableRegistry: true, + }, + }) + if err != nil { + return nil, err + } + + persistManager, err := fs.NewPersistManager(s.opts.FilesystemOptions()) + if err != nil { + return nil, err + } + + buildWg.Add(1) + go func() { + s.bootstrapFromReaders(run, md, + accumulator, runOpts, res, + readerPool, readersCh, builder, + &bootstrapper.SharedPersistManager{Mgr: persistManager}, + &bootstrapper.SharedCompactor{Compactor: compactor}, + cache) + buildWg.Done() + }() + } + + buildWg.Wait() return res, nil } @@ -854,6 +995,7 @@ func (s *fileSystemSource) bootstrapDataRunResultFromAvailability( shardTimeRanges result.ShardTimeRanges, cache bootstrap.Cache, ) (*runResult, error) { + // No locking required, all local to this fn until returned. runResult := newRunResult() unfulfilled := runResult.data.Unfulfilled() for shard, ranges := range shardTimeRanges.Iter() { @@ -886,15 +1028,21 @@ type bootstrapFromIndexPersistedBlocksResult struct { func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( ns namespace.Metadata, shardTimeRanges result.ShardTimeRanges, + cache bootstrap.Cache, ) (bootstrapFromIndexPersistedBlocksResult, error) { res := bootstrapFromIndexPersistedBlocksResult{ fulfilled: result.NewShardTimeRanges(), } indexBlockSize := ns.Options().IndexOptions().BlockSize() - infoFiles := fs.ReadIndexInfoFiles(s.fsopts.FilePathPrefix(), ns.ID(), - s.fsopts.InfoReaderBufferSize()) + infoFiles, err := cache.IndexInfoFilesForNamespace(ns) + if err != nil { + return bootstrapFromIndexPersistedBlocksResult{}, err + } + // Track corrupted block starts as we will attempt to later recover + // from corruption by building an index segment from TSDB data. + corruptedBlockStarts := make(map[xtime.UnixNano]struct{}) for _, infoFile := range infoFiles { if err := infoFile.Err.Error(); err != nil { s.log.Error("unable to read index info file", @@ -907,7 +1055,18 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( } info := infoFile.Info - indexBlockStart := xtime.UnixNano(info.BlockStart).ToTime() + indexBlockStartUnixNanos := xtime.UnixNano(info.BlockStart) + indexBlockStart := indexBlockStartUnixNanos.ToTime() + + if _, ok := corruptedBlockStarts[indexBlockStartUnixNanos]; ok { + s.log.Info("index block corrupted skipping index info file", + zap.Stringer("namespace", ns.ID()), + zap.Stringer("blockStart", indexBlockStart), + zap.String("filepath", infoFile.Err.Filepath()), + ) + continue + } + indexBlockRange := xtime.Range{ Start: indexBlockStart, End: indexBlockStart.Add(indexBlockSize), @@ -963,6 +1122,19 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( zap.Time("blockStart", indexBlockStart), zap.Int("volumeIndex", infoFile.ID.VolumeIndex), ) + if errors.Is(err, fs.ErrIndexReaderValidationFailed) { + // Emit a metric for failed validations. + s.metrics.indexBlocksFailedValidation.Inc(1) + // Track corrupted blocks and remove any loaded results. + corruptedBlockStarts[indexBlockStartUnixNanos] = struct{}{} + if err := removeIndexResults(ns, indexBlockStart, res.result); err != nil { + s.log.Error("error removing partial/corrupted index results", + zap.Error(err), + zap.Stringer("namespace", ns.ID()), + zap.Time("blockStart", indexBlockStart), + ) + } + } continue } @@ -970,26 +1142,30 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks( s.metrics.persistedIndexBlocksRead.Inc(1) // Record result. - if res.result == nil { - res.result = newRunResult() - } segmentsFulfilled := willFulfill // NB(bodu): All segments read from disk are already persisted. persistedSegments := make([]result.Segment, 0, len(readResult.Segments)) for _, segment := range readResult.Segments { persistedSegments = append(persistedSegments, result.NewSegment(segment, true)) } - volumeType := idxpersist.DefaultIndexVolumeType - if info.IndexVolumeType != nil { - volumeType = idxpersist.IndexVolumeType(info.IndexVolumeType.Value) - } indexBlockByVolumeType := result.NewIndexBlockByVolumeType(indexBlockStart) + volumeType := volumeTypeFromInfo(&info) indexBlockByVolumeType.SetBlock(volumeType, result.NewIndexBlock(persistedSegments, segmentsFulfilled)) + + if res.result == nil { + res.result = newRunResult() + } // NB(r): Don't need to call MarkFulfilled on the IndexResults here // as we've already passed the ranges fulfilled to the block that // we place in the IndexResuts with the call to Add(...). res.result.index.Add(indexBlockByVolumeType, nil) - res.fulfilled.AddRanges(segmentsFulfilled) + + // NB(bodu): We only mark ranges as fulfilled for the default index volume type. + // It's possible to have other index volume types but the default type is required to + // fulfill bootstrappable ranges. + if volumeType == idxpersist.DefaultIndexVolumeType { + res.fulfilled.AddRanges(segmentsFulfilled) + } } return res, nil @@ -1021,8 +1197,22 @@ func (r *runResult) addIndexBlockIfNotExists( } func (r *runResult) mergedResult(other *runResult) *runResult { + r.Lock() + defer r.Unlock() + + other.Lock() + defer other.Unlock() + return &runResult{ data: result.MergedDataBootstrapResult(r.data, other.data), index: result.MergedIndexBootstrapResult(r.index, other.index), } } + +func volumeTypeFromInfo(info *indexpb.IndexVolumeInfo) idxpersist.IndexVolumeType { + volumeType := idxpersist.DefaultIndexVolumeType + if info.IndexVolumeType != nil { + volumeType = idxpersist.IndexVolumeType(info.IndexVolumeType.Value) + } + return volumeType +} diff --git a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go index 14f83aa76d..754bec8de9 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/fs/source_index_test.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/index/convert" "github.com/m3db/m3/src/m3ninx/index/segment/mem" idxpersist "github.com/m3db/m3/src/m3ninx/persist" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" "github.com/stretchr/testify/require" @@ -304,7 +305,7 @@ func validateGoodTaggedSeries( } } -func TestBootstrapIndex(t *testing.T) { +func TestBootstrapIndexAndUnfulfilledRanges(t *testing.T) { dir := createTempDir(t) defer os.RemoveAll(dir) @@ -334,9 +335,46 @@ func TestBootstrapIndex(t *testing.T) { times.shardTimeRanges, opts.FilesystemOptions(), nsMD) defer tester.Finish() + // Write out non default type index volume type index block and ensure + // that it gets deleted and is not loaded into the index results to test + // the unfulfilled shard time ranges case (missing default index volume type + // and/or index segments failed validation). + var ( + notDefaultIndexVolumeType = idxpersist.IndexVolumeType("not-default") + shards = map[uint32]struct{}{testShard: struct{}{}} + filesToDelete []string + ) + idxWriter, err := fs.NewIndexWriter(src.fsopts) + require.NoError(t, err) + require.NoError(t, idxWriter.Open(fs.IndexWriterOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + FileSetContentType: persist.FileSetIndexContentType, + Namespace: nsMD.ID(), + BlockStart: times.start, + VolumeIndex: 1, + }, + BlockSize: nsMD.Options().IndexOptions().BlockSize(), + FileSetType: persist.FileSetFlushType, + Shards: shards, + IndexVolumeType: notDefaultIndexVolumeType, + })) + // Don't need to write any actual data. + require.NoError(t, idxWriter.Close()) + src.deleteFilesFn = func(files []string) error { + filesToDelete = append(filesToDelete, files...) + multiErr := xerrors.NewMultiError() + for _, f := range files { + multiErr = multiErr.Add(os.Remove(f)) + } + return multiErr.FinalError() + } + tester.TestReadWith(src) indexResults := tester.ResultForNamespace(nsMD.ID()).IndexResult.IndexResults() + // Ensure we are attempting to delete a single index fileset (in this case w/ no data). + require.Len(t, filesToDelete, 3) + // Check that single persisted segment got written out infoFiles := fs.ReadIndexInfoFiles(src.fsopts.FilePathPrefix(), testNs1ID, src.fsopts.InfoReaderBufferSize()) @@ -362,6 +400,11 @@ func TestBootstrapIndex(t *testing.T) { require.True(t, ok) require.True(t, segment.IsPersisted()) + // Check that the non default index volume type (missing default index volume type) + // was not added to the index results. + _, ok = blockByVolumeType.GetBlock(notDefaultIndexVolumeType) + require.False(t, ok) + // Check that the second segment is mutable and was not written out blockByVolumeType, ok = indexResults[xtime.ToUnixNano(times.start.Add(testIndexBlockSize))] require.True(t, ok) @@ -378,7 +421,7 @@ func TestBootstrapIndex(t *testing.T) { // Validate that wrote the block out (and no index blocks // were read as existing index blocks on disk) counters := scope.Snapshot().Counters() - require.Equal(t, int64(0), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) + require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-read+"].Value()) require.Equal(t, int64(1), counters["fs-bootstrapper.persist-index-blocks-write+"].Value()) } diff --git a/src/dbnode/storage/bootstrap/cache.go b/src/dbnode/storage/bootstrap/cache.go index f5468e773c..bd4c91d613 100644 --- a/src/dbnode/storage/bootstrap/cache.go +++ b/src/dbnode/storage/bootstrap/cache.go @@ -37,12 +37,15 @@ var ( ) type cache struct { - sync.Once - - fsOpts fs.Options - namespaceDetails []NamespaceDetails - infoFilesByNamespace InfoFilesByNamespace - iOpts instrument.Options + sync.Mutex + + fsOpts fs.Options + namespaceDetails []NamespaceDetails + infoFilesByNamespace InfoFilesByNamespace + indexInfoFilesByNamespace IndexInfoFilesByNamespace + iOpts instrument.Options + hasPopulatedInfo bool + hasPopulatedIndexInfo bool } // NewCache creates a cache specifically to be used during the bootstrap process. @@ -53,9 +56,11 @@ func NewCache(options CacheOptions) (Cache, error) { return nil, err } return &cache{ - fsOpts: options.FilesystemOptions(), - namespaceDetails: options.NamespaceDetails(), - iOpts: options.InstrumentOptions(), + fsOpts: options.FilesystemOptions(), + namespaceDetails: options.NamespaceDetails(), + infoFilesByNamespace: make(InfoFilesByNamespace, len(options.NamespaceDetails())), + indexInfoFilesByNamespace: make(IndexInfoFilesByNamespace, len(options.NamespaceDetails())), + iOpts: options.InstrumentOptions(), }, nil } @@ -83,22 +88,71 @@ func (c *cache) InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.Rea return infoFileResults, nil } +func (c *cache) IndexInfoFilesForNamespace(ns namespace.Metadata) ( + []fs.ReadIndexInfoFileResult, + error, +) { + infoFiles, ok := c.readIndexInfoFiles()[ns] + // This should never happen as Cache object is initialized with all namespaces to bootstrap. + if !ok { + return nil, fmt.Errorf("attempting to read index info files for namespace %v not "+ + "specified at bootstrap startup", ns.ID().String()) + } + return infoFiles, nil +} + +func (c *cache) Evict() { + c.Lock() + defer c.Unlock() + c.hasPopulatedInfo = false + c.hasPopulatedIndexInfo = false +} + func (c *cache) ReadInfoFiles() InfoFilesByNamespace { - c.Once.Do(func() { - c.infoFilesByNamespace = make(InfoFilesByNamespace, len(c.namespaceDetails)) - for _, finder := range c.namespaceDetails { - result := make(InfoFileResultsPerShard, len(finder.Shards)) - for _, shard := range finder.Shards { - result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(), - finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), c.fsOpts.DecodingOptions(), - persist.FileSetFlushType) - } - - c.infoFilesByNamespace[finder.Namespace] = result + c.Lock() + defer c.Unlock() + if !c.hasPopulatedInfo { + c.populateInfoFilesByNamespaceWithLock() + c.hasPopulatedInfo = true + } + return c.infoFilesByNamespace +} + +func (c *cache) populateInfoFilesByNamespaceWithLock() { + for _, finder := range c.namespaceDetails { + // NB(bodu): It is okay to reuse the info files by ns results per shard here + // as the shards were set in the cache ctor and do not change per invocation. + result, ok := c.infoFilesByNamespace[finder.Namespace] + if !ok { + result = make(InfoFileResultsPerShard, len(finder.Shards)) + } + for _, shard := range finder.Shards { + result[shard] = fs.ReadInfoFiles(c.fsOpts.FilePathPrefix(), + finder.Namespace.ID(), shard, c.fsOpts.InfoReaderBufferSize(), + c.fsOpts.DecodingOptions(), persist.FileSetFlushType) } - }) - return c.infoFilesByNamespace + c.infoFilesByNamespace[finder.Namespace] = result + } +} + +func (c *cache) readIndexInfoFiles() IndexInfoFilesByNamespace { + c.Lock() + defer c.Unlock() + if !c.hasPopulatedIndexInfo { + c.populateIndexInfoFilesByNamespaceWithLock() + c.hasPopulatedIndexInfo = true + } + return c.indexInfoFilesByNamespace +} + +func (c *cache) populateIndexInfoFilesByNamespaceWithLock() { + for i := range c.namespaceDetails { + finder := c.namespaceDetails[i] + c.indexInfoFilesByNamespace[finder.Namespace] = fs.ReadIndexInfoFiles( + c.fsOpts.FilePathPrefix(), finder.Namespace.ID(), + c.fsOpts.InfoReaderBufferSize()) + } } type cacheOptions struct { diff --git a/src/dbnode/storage/bootstrap/cache_test.go b/src/dbnode/storage/bootstrap/cache_test.go index 9b7a29114c..88699bbb93 100644 --- a/src/dbnode/storage/bootstrap/cache_test.go +++ b/src/dbnode/storage/bootstrap/cache_test.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" + idxpersist "github.com/m3db/m3/src/m3ninx/persist" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" @@ -133,6 +134,54 @@ func TestCacheReadInfoFilesInvariantViolation(t *testing.T) { require.Error(t, err) } +func TestCacheReadIndexInfoFiles(t *testing.T) { + dir := createTempDir(t) + defer os.RemoveAll(dir) + + md1 := testNamespaceMetadata(t, ident.StringID("ns1")) + md2 := testNamespaceMetadata(t, ident.StringID("ns2")) + + fsOpts := testFilesystemOptions.SetFilePathPrefix(dir) + + shards := map[uint32]struct{}{ + 0: struct{}{}, + 1: struct{}{}, + } + writeIndexFilesets(t, md1.ID(), shards, fsOpts) + writeIndexFilesets(t, md2.ID(), shards, fsOpts) + + opts := NewCacheOptions(). + SetFilesystemOptions(fsOpts). + SetInstrumentOptions(fsOpts.InstrumentOptions()). + SetNamespaceDetails([]NamespaceDetails{ + { + Namespace: md1, + Shards: []uint32{0, 1}, + }, + { + Namespace: md2, + Shards: []uint32{0, 1}, + }, + }) + cache, err := NewCache(opts) + require.NoError(t, err) + + infoFilesByNamespace := cache.ReadInfoFiles() + require.NotEmpty(t, infoFilesByNamespace) + + // Ensure we have two namespaces. + require.Equal(t, 2, len(infoFilesByNamespace)) + + // Ensure each shard has three info files (one for each fileset written). + infoFiles, err := cache.IndexInfoFilesForNamespace(md1) + require.NoError(t, err) + require.Equal(t, 3, len(infoFiles)) + + infoFiles, err = cache.IndexInfoFilesForNamespace(md2) + require.NoError(t, err) + require.Equal(t, 3, len(infoFiles)) +} + func testNamespaceMetadata(t *testing.T, nsID ident.ID) namespace.Metadata { rOpts := testRetentionOptions.SetBlockSize(testBlockSize) md, err := namespace.NewMetadata(nsID, testNamespaceOptions. @@ -154,6 +203,45 @@ type testSeries struct { data []byte } +func writeIndexFilesets( + t *testing.T, + namespace ident.ID, + shards map[uint32]struct{}, + fsOpts fs.Options, +) { + blockStart := testStart + blockSize := 10 * time.Hour + numBlocks := 3 + for i := 0; i < numBlocks; i++ { + writeIndexFiles(t, namespace, shards, blockStart.Add(time.Duration(i)*blockSize), + blockSize, fsOpts) + } +} + +func writeIndexFiles( + t *testing.T, + namespace ident.ID, + shards map[uint32]struct{}, + blockStart time.Time, + blockSize time.Duration, + fsOpts fs.Options, +) { + idxWriter, err := fs.NewIndexWriter(fsOpts) + require.NoError(t, err) + require.NoError(t, idxWriter.Open(fs.IndexWriterOpenOptions{ + Identifier: fs.FileSetFileIdentifier{ + FileSetContentType: persist.FileSetIndexContentType, + Namespace: namespace, + BlockStart: blockStart, + }, + BlockSize: blockSize, + FileSetType: persist.FileSetFlushType, + Shards: shards, + IndexVolumeType: idxpersist.DefaultIndexVolumeType, + })) + require.NoError(t, idxWriter.Close()) +} + func writeFilesets(t *testing.T, namespace ident.ID, shard uint32, fsOpts fs.Options) { inputs := []struct { start time.Time diff --git a/src/dbnode/storage/bootstrap/result/result_index.go b/src/dbnode/storage/bootstrap/result/result_index.go index 1a398139a1..0bfb7ad4ed 100644 --- a/src/dbnode/storage/bootstrap/result/result_index.go +++ b/src/dbnode/storage/bootstrap/result/result_index.go @@ -312,6 +312,11 @@ func (b IndexBlockByVolumeType) SetBlock(volumeType persist.IndexVolumeType, blo b.data[volumeType] = block } +// DeleteBlock deletes an IndexBlock for volumeType. +func (b IndexBlockByVolumeType) DeleteBlock(volumeType persist.IndexVolumeType) { + delete(b.data, volumeType) +} + // Iter returns the underlying iterable map data. func (b IndexBlockByVolumeType) Iter() map[persist.IndexVolumeType]IndexBlock { return b.data diff --git a/src/dbnode/storage/bootstrap/types.go b/src/dbnode/storage/bootstrap/types.go index 186728af31..2abd3b6e68 100644 --- a/src/dbnode/storage/bootstrap/types.go +++ b/src/dbnode/storage/bootstrap/types.go @@ -429,6 +429,9 @@ type InfoFileResultsPerShard map[uint32][]fs.ReadInfoFileResult // InfoFilesByNamespace maps a namespace to info files grouped by shard. type InfoFilesByNamespace map[namespace.Metadata]InfoFileResultsPerShard +// IndexInfoFilesByNamespace maps a namespace to index info files. +type IndexInfoFilesByNamespace map[namespace.Metadata][]fs.ReadIndexInfoFileResult + // Cache provides a snapshot of info files for use throughout all stages of the bootstrap. type Cache interface { // InfoFilesForNamespace returns the info files grouped by namespace. @@ -437,6 +440,9 @@ type Cache interface { // InfoFilesForShard returns the info files grouped by shard for the provided namespace. InfoFilesForShard(ns namespace.Metadata, shard uint32) ([]fs.ReadInfoFileResult, error) + // IndexInfoFilesForNamespace returns the index info files. + IndexInfoFilesForNamespace(ns namespace.Metadata) ([]fs.ReadIndexInfoFileResult, error) + // ReadInfoFiles returns info file results for each shard grouped by namespace. A cached copy // is returned if the info files have already been read. ReadInfoFiles() InfoFilesByNamespace diff --git a/src/dbnode/storage/cleanup.go b/src/dbnode/storage/cleanup.go index d91db51329..41ac07f22a 100644 --- a/src/dbnode/storage/cleanup.go +++ b/src/dbnode/storage/cleanup.go @@ -39,13 +39,21 @@ import ( "go.uber.org/zap" ) -type commitLogFilesFn func(commitlog.Options) (persist.CommitLogFiles, []commitlog.ErrorWithPath, error) -type snapshotMetadataFilesFn func(fs.Options) ([]fs.SnapshotMetadata, []fs.SnapshotMetadataErrorWithPaths, error) +type ( + commitLogFilesFn func(commitlog.Options) ( + persist.CommitLogFiles, + []commitlog.ErrorWithPath, + error, + ) + snapshotMetadataFilesFn func(fs.Options) ( + []fs.SnapshotMetadata, + []fs.SnapshotMetadataErrorWithPaths, + error, + ) +) type snapshotFilesFn func(filePathPrefix string, namespace ident.ID, shard uint32) (fs.FileSetFilesSlice, error) -type deleteFilesFn func(files []string) error - type deleteInactiveDirectoriesFn func(parentDirPath string, activeDirNames []string) error // Narrow interface so as not to expose all the functionality of the commitlog @@ -68,7 +76,7 @@ type cleanupManager struct { snapshotMetadataFilesFn snapshotMetadataFilesFn snapshotFilesFn snapshotFilesFn - deleteFilesFn deleteFilesFn + deleteFilesFn fs.DeleteFilesFn deleteInactiveDirectoriesFn deleteInactiveDirectoriesFn warmFlushCleanupInProgress bool coldFlushCleanupInProgress bool @@ -213,6 +221,7 @@ func (m *cleanupManager) ColdFlushCleanup(t time.Time, isBootstrapped bool) erro return multiErr.FinalError() } + func (m *cleanupManager) Report() { m.RLock() coldFlushCleanupInProgress := m.coldFlushCleanupInProgress diff --git a/src/dbnode/storage/index.go b/src/dbnode/storage/index.go index b7ce9c52ef..4529aa72e7 100644 --- a/src/dbnode/storage/index.go +++ b/src/dbnode/storage/index.go @@ -88,9 +88,7 @@ const ( defaultFlushDocsBatchSize = 8192 ) -var ( - allQuery = idx.NewAllQuery() -) +var allQuery = idx.NewAllQuery() // nolint: maligned type nsIndex struct { @@ -110,7 +108,7 @@ type nsIndex struct { namespaceRuntimeOptsMgr namespace.RuntimeOptionsManager indexFilesetsBeforeFn indexFilesetsBeforeFn - deleteFilesFn deleteFilesFn + deleteFilesFn fs.DeleteFilesFn readIndexInfoFilesFn readIndexInfoFilesFn newBlockFn index.NewBlockFn @@ -1888,6 +1886,10 @@ func (i *nsIndex) CleanupExpiredFileSets(t time.Time) error { return i.deleteFilesFn(filesets) } +// CleanupDuplicateFileSets only considers an index fileset of the same index volume type +// that covers a superset of shard time ranges as a dupe. We can have index filesets +// of the default volume type that have non-overlapping shard time ranges in the node leave +// case where we accept new shards and a index fileset is persisted to disk w/ the new shards. func (i *nsIndex) CleanupDuplicateFileSets() error { fsOpts := i.opts.CommitLogOptions().FilesystemOptions() infoFiles := i.readIndexInfoFilesFn( diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 9897336fec..12df5c7ebf 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -175,7 +175,7 @@ type dbShard struct { newFSMergeWithMemFn newFSMergeWithMemFn filesetsFn filesetsFn filesetPathsBeforeFn filesetPathsBeforeFn - deleteFilesFn deleteFilesFn + deleteFilesFn fs.DeleteFilesFn snapshotFilesFn snapshotFilesFn sleepFn func(time.Duration) identifierPool ident.Pool @@ -1301,7 +1301,6 @@ func (s *dbShard) insertSeriesForIndexingAsyncBatched( entryRefCountIncremented: true, }, }) - // i.e. unable to enqueue into shard insert queue if err != nil { entry.OnIndexFinalize(indexBlockStart) // release any reference's we've held for indexing