From c3962d32cdbbaff30639a2ff8cb3e37c186173f9 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Sat, 24 Nov 2018 01:37:39 -0500 Subject: [PATCH] Add compaction of mutable segments for index blocks --- src/dbnode/server/server.go | 2 + src/dbnode/storage/index/active_segment.go | 75 ++++ src/dbnode/storage/index/block.go | 323 ++++++++++++++---- .../storage/index/compaction/compactor.go | 203 +++++++++++ src/dbnode/storage/index/compaction/plan.go | 5 +- src/dbnode/storage/index/options.go | 16 +- src/dbnode/storage/index/types.go | 7 + .../segment/mem/concurrent_postings_map.go | 6 + src/m3ninx/index/segment/mem/segment.go | 17 + src/m3ninx/index/segment/mem/terms_dict.go | 13 + src/m3ninx/index/segment/mem/types.go | 3 + src/m3ninx/index/segment/types.go | 4 + src/m3ninx/persist/reader.go | 58 ++++ 13 files changed, 668 insertions(+), 64 deletions(-) create mode 100644 src/dbnode/storage/index/active_segment.go create mode 100644 src/dbnode/storage/index/compaction/compactor.go diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 4bba021c9a..555a2fad9a 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -1098,6 +1098,8 @@ func withEncodingAndPoolingOptions( SetInstrumentOptions(iopts). SetMemSegmentOptions( opts.IndexOptions().MemSegmentOptions().SetInstrumentOptions(iopts)). + SetFSTSegmentOptions( + opts.IndexOptions().FSTSegmentOptions().SetInstrumentOptions(iopts)). SetIdentifierPool(identifierPool). SetCheckedBytesPool(bytesPool). SetResultsPool(resultsPool) diff --git a/src/dbnode/storage/index/active_segment.go b/src/dbnode/storage/index/active_segment.go new file mode 100644 index 0000000000..3f8c909f7c --- /dev/null +++ b/src/dbnode/storage/index/active_segment.go @@ -0,0 +1,75 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package index + +import ( + "time" + + "github.com/m3db/m3/src/m3ninx/index/segment" +) + +type readableSegment interface { + Age() time.Duration +} + +type mutableReadableSeg struct { + createdAt time.Time + segment segment.MutableSegment +} + +var _ readableSegment = &mutableReadableSeg{} + +func newMutableReadableSeg(seg segment.MutableSegment) *mutableReadableSeg { + return &mutableReadableSeg{ + createdAt: time.Now(), + segment: seg, + } +} + +func (s *mutableReadableSeg) Segment() segment.MutableSegment { + return s.segment +} + +func (s *mutableReadableSeg) Age() time.Duration { + return time.Since(s.createdAt) +} + +type readableSeg struct { + createdAt time.Time + segment segment.Segment +} + +var _ readableSegment = &readableSeg{} + +func newReadableSeg(seg segment.Segment) *readableSeg { + return &readableSeg{ + createdAt: time.Now(), + segment: seg, + } +} + +func (s *readableSeg) Segment() segment.Segment { + return s.segment +} + +func (s *readableSeg) Age() time.Duration { + return time.Since(s.createdAt) +} diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index b63c4bb931..ecfe8af071 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -27,6 +27,8 @@ import ( "time" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" + "github.com/m3db/m3/src/dbnode/storage/index/compaction" + "github.com/m3db/m3/src/dbnode/storage/index/segments" "github.com/m3db/m3/src/dbnode/storage/namespace" "github.com/m3db/m3/src/m3ninx/doc" m3ninxindex "github.com/m3db/m3/src/m3ninx/index" @@ -57,9 +59,17 @@ var ( type blockState byte const ( - blockStateClosed blockState = iota - blockStateOpen + blockStateOpen blockState = iota blockStateSealed + blockStateMutableEvicted + blockStateClosed +) + +var ( + blockStatesCloseCompactedBlocks = map[blockState]struct{}{ + blockStateMutableEvicted: struct{}{}, + blockStateClosed: struct{}{}, + } ) type newExecutorFn func() (search.Executor, error) @@ -67,7 +77,8 @@ type newExecutorFn func() (search.Executor, error) type block struct { sync.RWMutex state blockState - activeSegment segment.MutableSegment + activeSegment *mutableReadableSeg + frozenSegments []*readableSeg shardRangesSegments []blockShardRangesSegments newExecutorFn newExecutorFn @@ -77,6 +88,11 @@ type block struct { opts Options nsMD namespace.Metadata docsPool doc.DocumentArrayPool + + compacting bool + compactor *compaction.Compactor + + logger xlog.Logger } // blockShardsSegments is a collection of segments that has a mapping of what shards @@ -94,33 +110,61 @@ func NewBlock( md namespace.Metadata, opts Options, ) (Block, error) { - var ( - blockSize = md.Options().IndexOptions().BlockSize() - ) + docsPool := opts.DocumentArrayPool() - // FOLLOWUP(prateek): use this to track segments when we have multiple segments in a Block. - postingsOffset := postings.ID(0) - seg, err := mem.NewSegment(postingsOffset, opts.MemSegmentOptions()) + compactor, err := compaction.NewCompactor(docsPool, + documentArrayPoolCapacity, + opts.MemSegmentOptions(), + opts.FSTSegmentOptions()) if err != nil { return nil, err } + blockSize := md.Options().IndexOptions().BlockSize() b := &block{ - state: blockStateOpen, - activeSegment: seg, - + state: blockStateOpen, startTime: startTime, endTime: startTime.Add(blockSize), blockSize: blockSize, opts: opts, nsMD: md, - docsPool: opts.DocumentArrayPool(), + docsPool: docsPool, + compactor: compactor, + logger: opts.InstrumentOptions().Logger(), } b.newExecutorFn = b.executorWithRLock + if _, err := b.rotateActiveSegment(); err != nil { + return nil, err + } + return b, nil } +func (b *block) newMutableSegment() (segment.MutableSegment, error) { + postingsOffset := postings.ID(0) + seg, err := mem.NewSegment(postingsOffset, b.opts.MemSegmentOptions()) + if err != nil { + return nil, err + } + + return seg, nil +} + +func (b *block) rotateActiveSegment() (*mutableReadableSeg, error) { + // NB(r): This may be nil on the first call on initialization + prev := b.activeSegment + + seg, err := b.newMutableSegment() + if err != nil { + return nil, err + } + + b.activeSegment = newMutableReadableSeg(seg) + + return prev, nil +} + func (b *block) StartTime() time.Time { return b.startTime } @@ -129,10 +173,167 @@ func (b *block) EndTime() time.Time { return b.endTime } -func (b *block) WriteBatch(inserts *WriteBatch) (WriteBatchResult, error) { +func (b *block) maybeCompactWithLock() { + if b.compacting || b.state != blockStateOpen { + return + } + + // Create a logical plan + segs := make([]compaction.Segment, 0, 1+len(b.frozenSegments)) + if b.activeSegment != nil && b.activeSegment.Segment().Size() > 0 { + segs = append(segs, compaction.Segment{ + Age: b.activeSegment.Age(), + Size: b.activeSegment.Segment().Size(), + Type: segments.MutableType, + Segment: b.activeSegment.Segment(), + }) + } + + for _, seg := range b.frozenSegments { + segs = append(segs, compaction.Segment{ + Age: seg.Age(), + Size: seg.Segment().Size(), + Type: segments.FSTType, + Segment: seg.Segment(), + }) + } + + plan, err := compaction.NewPlan(segs, compaction.PlannerOptions{}) + if err != nil { + b.logger.Errorf("could not create index compaction plan: %v", err) + return + } + + if len(plan.Tasks) == 0 { + return + } + + var compactingActiveSegment bool + if b.activeSegment != nil { + for _, task := range plan.Tasks { + for _, seg := range task.Segments { + if seg.Segment == b.activeSegment.Segment() { + compactingActiveSegment = true + break + } + } + if compactingActiveSegment { + break + } + } + } + + if compactingActiveSegment { + // Rotate the current active segment so it's not written to while + // we're compacting it + prev, err := b.rotateActiveSegment() + if err != nil { + b.logger.Errorf("could not rotate active segment for compaction: %v", err) + return + } + + b.frozenSegments = append(b.frozenSegments, newReadableSeg(prev.Segment())) + } + + // Kick off compaction + b.startCompactWithLock(plan) +} + +func (b *block) startCompactWithLock(plan *compaction.Plan) { + b.compacting = true + go func() { + b.compactWithPlan(plan) + + b.Lock() + defer b.Unlock() + + b.cleanupCompactWithLock() + }() +} + +func (b *block) cleanupCompactWithLock() { + b.compacting = false + + if b.state == blockStateOpen { + // See if we need to trigger another compaction + b.maybeCompactWithLock() + return + } + + // Check if need to close all the frozen segments + _, closeSegments := blockStatesCloseCompactedBlocks[b.state] + if !closeSegments { + return + } + + for _, seg := range b.frozenSegments { + err := seg.Segment().Close() + if err != nil { + b.logger.Errorf("could not close frozen segment: %v", err) + } + } + b.frozenSegments = nil +} + +func (b *block) compactWithPlan(plan *compaction.Plan) { + for _, task := range plan.Tasks { + if err := b.compactWithTask(task); err != nil { + b.logger.Errorf("error compacting segments: %v", err) + return + } + } +} + +func (b *block) compactWithTask(task compaction.Task) error { + segments := make([]segment.Segment, 0, len(task.Segments)) + for _, seg := range task.Segments { + segments = append(segments, seg.Segment) + } + + compacted, err := b.compactor.Compact(segments) + if err != nil { + return err + } + + // Rotate out the replaced frozen segments and add the compacted one b.Lock() defer b.Unlock() + newFrozenSegments := make([]*readableSeg, 0, len(b.frozenSegments)) + for _, frozen := range b.frozenSegments { + keepCurr := true + for _, seg := range segments { + if frozen.Segment() == seg { + // Do not keep this one, it was compacted just then + keepCurr = false + break + } + } + + if keepCurr { + newFrozenSegments = append(newFrozenSegments, frozen) + continue + } + + if err := frozen.Segment().Close(); err != nil { + // Already compacted, not much we can do about not closing it + b.logger.Errorf("unable to close compacted block: %v", err) + } + } + + newFrozenSegments = append(newFrozenSegments, newReadableSeg(compacted)) + b.frozenSegments = newFrozenSegments + + return nil +} + +func (b *block) WriteBatch(inserts *WriteBatch) (WriteBatchResult, error) { + b.Lock() + defer func() { + b.maybeCompactWithLock() + b.Unlock() + }() + if b.state != blockStateOpen { err := b.writeBatchErrorInvalidState(b.state) inserts.MarkUnmarkedEntriesError(err) @@ -151,7 +352,7 @@ func (b *block) WriteBatch(inserts *WriteBatch) (WriteBatchResult, error) { }, err } - err := b.activeSegment.InsertBatch(m3ninxindex.Batch{ + err := b.activeSegment.Segment().InsertBatch(m3ninxindex.Batch{ Docs: inserts.PendingDocs(), AllowPartialUpdates: true, }) @@ -209,7 +410,16 @@ func (b *block) executorWithRLock() (search.Executor, error) { // start with the segment that's being actively written to (if we have one) if b.activeSegment != nil { - reader, err := b.activeSegment.Reader() + reader, err := b.activeSegment.Segment().Reader() + if err != nil { + return nil, err + } + readers = append(readers, reader) + } + + // loop over frozen segments + for _, seg := range b.frozenSegments { + reader, err := seg.Segment().Reader() if err != nil { return nil, err } @@ -363,25 +573,6 @@ func (b *block) AddResults( results.Fulfilled().SummaryString(), blockRange.String()) } - // NB: need to check if the current block has been marked 'Sealed' and if so, - // mark all incoming mutable segments the same. - isSealed := b.IsSealedWithRLock() - - var multiErr xerrors.MultiError - for _, seg := range results.Segments() { - if x, ok := seg.(segment.MutableSegment); ok { - if isSealed { - _, err := x.Seal() - if err != nil { - // if this happens it means a Mutable segment was marked sealed - // in the bootstrappers, this should never happen. - err := b.bootstrappingSealedMutableSegmentInvariant(err) - multiErr = multiErr.Add(err) - } - } - } - } - entry := blockShardRangesSegments{ shardTimeRanges: results.Fulfilled(), segments: results.Segments(), @@ -400,11 +591,12 @@ func (b *block) AddResults( // This is the case where it cannot wholly replace the current set of blocks // so simply append the segments in this case b.shardRangesSegments = append(b.shardRangesSegments, entry) - return multiErr.FinalError() + return nil } // This is the case where the new segments can wholly replace the // current set of blocks since unfullfilled by the new segments is zero + multiErr := xerrors.NewMultiError() for i, group := range b.shardRangesSegments { for _, seg := range group.segments { // Make sure to close the existing segments @@ -428,7 +620,13 @@ func (b *block) Tick(c context.Cancellable, tickStart time.Time) (BlockTickResul // active segment, can be nil incase we've evicted it already. if b.activeSegment != nil { result.NumSegments++ - result.NumDocs += b.activeSegment.Size() + result.NumDocs += b.activeSegment.Segment().Size() + } + + // add frozen segments + for _, seg := range b.frozenSegments { + result.NumSegments++ + result.NumDocs += seg.Segment().Size() } // any other segments @@ -455,18 +653,12 @@ func (b *block) Seal() error { var multiErr xerrors.MultiError // seal active mutable segment. - _, err := b.activeSegment.Seal() + _, err := b.activeSegment.Segment().Seal() multiErr = multiErr.Add(err) - // loop over any added mutable segments and seal them too. - for _, group := range b.shardRangesSegments { - for _, seg := range group.segments { - if unsealed, ok := seg.(segment.MutableSegment); ok { - _, err := unsealed.Seal() - multiErr = multiErr.Add(err) - } - } - } + // all frozen segments and added mutable segments can't actually be + // written to and they don't need to be sealed since we don't + // flush these segments return multiErr.FinalError() } @@ -484,21 +676,17 @@ func (b *block) IsSealed() bool { func (b *block) NeedsMutableSegmentsEvicted() bool { b.RLock() defer b.RUnlock() - anyMutableSegmentNeedsEviction := b.activeSegment != nil && b.activeSegment.Size() > 0 + anyMutableSegmentNeedsEviction := b.activeSegment != nil && + b.activeSegment.Segment().Size() > 0 // can early terminate if we already know we need to flush. if anyMutableSegmentNeedsEviction { return true } - // otherwise we check all the boostrapped segments and to see if any of them - // need a flush - for _, shardRangeSegments := range b.shardRangesSegments { - for _, seg := range shardRangeSegments.segments { - if mutableSeg, ok := seg.(segment.MutableSegment); ok { - anyMutableSegmentNeedsEviction = anyMutableSegmentNeedsEviction || mutableSeg.Size() > 0 - } - } + // check any frozen segments need to be flushed. + for _, seg := range b.frozenSegments { + anyMutableSegmentNeedsEviction = anyMutableSegmentNeedsEviction || seg.Segment().Size() > 0 } return anyMutableSegmentNeedsEviction @@ -511,17 +699,25 @@ func (b *block) EvictMutableSegments() (EvictMutableSegmentResults, error) { if b.state != blockStateSealed { return results, fmt.Errorf("unable to evict mutable segments, block must be sealed, found: %v", b.state) } + b.state = blockStateMutableEvicted var multiErr xerrors.MultiError // close active segment. if b.activeSegment != nil { results.NumMutableSegments++ - results.NumDocs += b.activeSegment.Size() - multiErr = multiErr.Add(b.activeSegment.Close()) + results.NumDocs += b.activeSegment.Segment().Size() + multiErr = multiErr.Add(b.activeSegment.Segment().Close()) b.activeSegment = nil } - // close any other mutable segments too. + // if not compacting, trigger a cleanup so that all frozen segments get + // closed, otherwise after the current running compaction the frozen + // segments will get closed + if !b.compacting { + b.cleanupCompactWithLock() + } + + // close any other mutable segments that was added. for idx := range b.shardRangesSegments { segments := make([]segment.Segment, 0, len(b.shardRangesSegments[idx].segments)) for _, seg := range b.shardRangesSegments[idx].segments { @@ -552,10 +748,17 @@ func (b *block) Close() error { // close active segment. if b.activeSegment != nil { - multiErr = multiErr.Add(b.activeSegment.Close()) + multiErr = multiErr.Add(b.activeSegment.Segment().Close()) b.activeSegment = nil } + // if not compacting, trigger a cleanup so that all frozen segments get + // closed, otherwise after the current running compaction the frozen + // segments will get closed + if !b.compacting { + b.cleanupCompactWithLock() + } + // close any other added segments too. for _, group := range b.shardRangesSegments { for _, seg := range group.segments { diff --git a/src/dbnode/storage/index/compaction/compactor.go b/src/dbnode/storage/index/compaction/compactor.go new file mode 100644 index 0000000000..ef70387002 --- /dev/null +++ b/src/dbnode/storage/index/compaction/compactor.go @@ -0,0 +1,203 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package compaction + +import ( + "bytes" + "sync" + + "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index" + "github.com/m3db/m3/src/m3ninx/index/segment" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" + "github.com/m3db/m3/src/m3ninx/index/segment/mem" + "github.com/m3db/m3/src/m3ninx/persist" + "github.com/m3db/m3/src/x/mmap" +) + +// Compactor is a compactor. +type Compactor struct { + sync.RWMutex + + docsPool doc.DocumentArrayPool + docsMaxBatch int + fstOpts fst.Options + mutableSeg segment.MutableSegment + buff *bytes.Buffer + reader *bytes.Reader +} + +// NewCompactor returns a new compactor which reuses buffers +// to avoid allocating intermediate buffers when compacting. +func NewCompactor( + docsPool doc.DocumentArrayPool, + docsMaxBatch int, + memOpts mem.Options, + fstOpts fst.Options, +) (*Compactor, error) { + mutableSeg, err := mem.NewSegment(0, memOpts) + if err != nil { + return nil, err + } + + return &Compactor{ + docsPool: docsPool, + docsMaxBatch: docsMaxBatch, + mutableSeg: mutableSeg, + fstOpts: fstOpts, + buff: bytes.NewBuffer(nil), + }, nil +} + +func (c *Compactor) Compact(segs []segment.Segment) (segment.Segment, error) { + if len(segs) == 1 { + if seg, ok := segs[0].(segment.MutableSegment); ok { + // If just a single mutable segment, can compact it directly + return c.compact(seg) + } + } + + // Need to combine segments first + c.mutableSeg.Reset(0) + + batch := c.docsPool.Get() + defer func() { + c.docsPool.Put(batch) + }() + + for _, seg := range segs { + reader, err := seg.Reader() + if err != nil { + return nil, err + } + + iter, err := reader.AllDocs() + if err != nil { + return nil, err + } + + for iter.Next() { + batch = append(batch, iter.Current()) + if len(batch) < c.docsMaxBatch { + continue + } + + err := c.mutableSeg.InsertBatch(index.Batch{Docs: batch}) + if err != nil { + return nil, err + } + batch = batch[:0] + } + + if err := iter.Err(); err != nil { + return nil, err + } + if err := iter.Close(); err != nil { + return nil, err + } + } + + if len(batch) != 0 { + // Flush last batch + err := c.mutableSeg.InsertBatch(index.Batch{Docs: batch}) + if err != nil { + return nil, err + } + } + + return c.compact(c.mutableSeg) +} + +func (c *Compactor) compact( + seg segment.MutableSegment, +) (segment.Segment, error) { + writer, err := persist.NewMutableSegmentFileSetWriter() + if err != nil { + return nil, err + } + + if err := writer.Reset(seg); err != nil { + return nil, err + } + + success := false + fstData := &fstSegmentMetadata{ + major: writer.MajorVersion(), + minor: writer.MinorVersion(), + metadata: append([]byte(nil), writer.SegmentMetadata()...), + files: make([]persist.IndexSegmentFile, 0, len(writer.Files())), + } + // Cleanup incase we run into issues + defer func() { + if !success { + for _, f := range fstData.files { + f.Close() + } + } + }() + + for _, f := range writer.Files() { + c.buff.Reset() + if err := writer.WriteFile(f, c.buff); err != nil { + return nil, err + } + + fileBytes := c.buff.Bytes() + + // Copy bytes to new mmap region to hide from the GC + mmapedResult, err := mmap.Bytes(int64(len(fileBytes)), mmap.Options{ + Read: true, + Write: true, + }) + if err != nil { + return nil, err + } + copy(mmapedResult.Result, fileBytes) + + segmentFile := persist.NewMmapedIndexSegmentFile(f, nil, mmapedResult.Result) + fstData.files = append(fstData.files, segmentFile) + } + + // NB: need to mark success here as the NewSegment call assumes ownership of + // the provided bytes regardless of success/failure. + success = true + + return persist.NewSegment(fstData, c.fstOpts) +} + +type fstSegmentMetadata struct { + major int + minor int + metadata []byte + files []persist.IndexSegmentFile +} + +var _ persist.IndexSegmentFileSet = &fstSegmentMetadata{} + +func (f *fstSegmentMetadata) SegmentType() persist.IndexSegmentType { + return persist.FSTIndexSegmentType +} +func (f *fstSegmentMetadata) MajorVersion() int { return f.major } +func (f *fstSegmentMetadata) MinorVersion() int { return f.minor } +func (f *fstSegmentMetadata) SegmentMetadata() []byte { return f.metadata } +func (f *fstSegmentMetadata) Files() []persist.IndexSegmentFile { + return f.files +} diff --git a/src/dbnode/storage/index/compaction/plan.go b/src/dbnode/storage/index/compaction/plan.go index 4bd41e5801..5970358ebd 100644 --- a/src/dbnode/storage/index/compaction/plan.go +++ b/src/dbnode/storage/index/compaction/plan.go @@ -24,7 +24,6 @@ import ( "errors" "fmt" "sort" - "time" "github.com/m3db/m3/src/dbnode/storage/index/segments" ) @@ -61,8 +60,8 @@ var ( // DefaultOptions are the default compaction PlannerOptions. DefaultOptions = PlannerOptions{ - MutableSegmentSizeThreshold: 1 << 16, // 64K - MutableCompactionAgeThreshold: 15 * time.Second, // any mutable segment 15s or older is eligible for compactions + MutableSegmentSizeThreshold: 0, // any mutable segment is eligible for compactions + MutableCompactionAgeThreshold: 0, // any mutable segment is eligible for compactions Levels: DefaultLevels, // sizes defined above OrderBy: TasksOrderedByOldestMutableAndSize, // compact mutable segments first } diff --git a/src/dbnode/storage/index/options.go b/src/dbnode/storage/index/options.go index 024d7cd8a1..6c7ff6cb41 100644 --- a/src/dbnode/storage/index/options.go +++ b/src/dbnode/storage/index/options.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/dbnode/clock" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/index/segment/mem" "github.com/m3db/m3x/ident" "github.com/m3db/m3x/instrument" @@ -56,6 +57,7 @@ type opts struct { clockOpts clock.Options instrumentOpts instrument.Options memOpts mem.Options + fstOpts fst.Options idPool ident.Pool bytesPool pool.CheckedBytesPool resultsPool ResultsPool @@ -83,11 +85,13 @@ func NewOptions() Options { }) docArrayPool.Init() + instrumentOpts := instrument.NewOptions() opts := &opts{ insertMode: defaultIndexInsertMode, clockOpts: clock.NewOptions(), - instrumentOpts: instrument.NewOptions(), + instrumentOpts: instrumentOpts, memOpts: mem.NewOptions().SetNewUUIDFn(undefinedUUIDFn), + fstOpts: fst.NewOptions().SetInstrumentOptions(instrumentOpts), bytesPool: bytesPool, idPool: idPool, resultsPool: resultsPool, @@ -152,6 +156,16 @@ func (o *opts) MemSegmentOptions() mem.Options { return o.memOpts } +func (o *opts) SetFSTSegmentOptions(value fst.Options) Options { + opts := *o + opts.fstOpts = value + return &opts +} + +func (o *opts) FSTSegmentOptions() fst.Options { + return o.fstOpts +} + func (o *opts) SetIdentifierPool(value ident.Pool) Options { opts := *o opts.idPool = value diff --git a/src/dbnode/storage/index/types.go b/src/dbnode/storage/index/types.go index 0623f6cb93..581d6a9127 100644 --- a/src/dbnode/storage/index/types.go +++ b/src/dbnode/storage/index/types.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" "github.com/m3db/m3/src/m3ninx/doc" "github.com/m3db/m3/src/m3ninx/idx" + "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/index/segment/mem" "github.com/m3db/m3x/context" "github.com/m3db/m3x/ident" @@ -574,6 +575,12 @@ type Options interface { // MemSegmentOptions returns the mem segment options. MemSegmentOptions() mem.Options + // SetFSTSegmentOptions sets the fst segment options. + SetFSTSegmentOptions(value fst.Options) Options + + // FSTSegmentOptions returns the fst segment options. + FSTSegmentOptions() fst.Options + // SetIdentifierPool sets the identifier pool. SetIdentifierPool(value ident.Pool) Options diff --git a/src/m3ninx/index/segment/mem/concurrent_postings_map.go b/src/m3ninx/index/segment/mem/concurrent_postings_map.go index f3afb84a55..ba9f5ff33c 100644 --- a/src/m3ninx/index/segment/mem/concurrent_postings_map.go +++ b/src/m3ninx/index/segment/mem/concurrent_postings_map.go @@ -124,3 +124,9 @@ func (m *concurrentPostingsMap) GetRegex(re *regexp.Regexp) (postings.List, bool } return pl, true } + +func (m *concurrentPostingsMap) Reset() { + m.Lock() + m.postingsMap.Reset() + m.Unlock() +} diff --git a/src/m3ninx/index/segment/mem/segment.go b/src/m3ninx/index/segment/mem/segment.go index a01687be8c..c48cd48b08 100644 --- a/src/m3ninx/index/segment/mem/segment.go +++ b/src/m3ninx/index/segment/mem/segment.go @@ -84,6 +84,23 @@ func NewSegment(offset postings.ID, opts Options) (sgmt.MutableSegment, error) { return s, nil } +func (s *segment) Reset(offset postings.ID) { + s.state.Lock() + defer s.state.Unlock() + + s.termsDict.Reset() + s.readerID = postings.NewAtomicID(offset) + + var empty doc.Document + for i := range s.docs.data { + s.docs.data[i] = empty + } + s.docs.data = s.docs.data[:0] + + s.writer.idSet.Reset() + s.writer.nextID = offset +} + func (s *segment) Size() int64 { s.state.RLock() closed := s.state.closed diff --git a/src/m3ninx/index/segment/mem/terms_dict.go b/src/m3ninx/index/segment/mem/terms_dict.go index 54bcf6867c..046da39fd9 100644 --- a/src/m3ninx/index/segment/mem/terms_dict.go +++ b/src/m3ninx/index/segment/mem/terms_dict.go @@ -116,6 +116,19 @@ func (d *termsDict) MatchRegexp( return pl } +func (d *termsDict) Reset() { + d.fields.Lock() + defer d.fields.Unlock() + + // NB(r): We actually want to keep the terms maps around so that they + // can be reused and avoid reallocation, so instead of deleting them + // we just reset each one + for _, entry := range d.fields.Iter() { + postingsMap := entry.Value() + postingsMap.Reset() + } +} + func (d *termsDict) getOrAddName(name []byte) *concurrentPostingsMap { // Cheap read lock to see if it already exists. d.fields.RLock() diff --git a/src/m3ninx/index/segment/mem/types.go b/src/m3ninx/index/segment/mem/types.go index da6eacd037..c19cb6480d 100644 --- a/src/m3ninx/index/segment/mem/types.go +++ b/src/m3ninx/index/segment/mem/types.go @@ -50,6 +50,9 @@ type termsDictionary interface { // Terms returns the known terms values for the given field. Terms(field []byte) sgmt.TermsIterator + + // Reset resets the terms dictionary for reuse. + Reset() } // ReadableSegment is an internal interface for reading from a segment. diff --git a/src/m3ninx/index/segment/types.go b/src/m3ninx/index/segment/types.go index 87d35573d3..4e8f607b3f 100644 --- a/src/m3ninx/index/segment/types.go +++ b/src/m3ninx/index/segment/types.go @@ -24,6 +24,7 @@ import ( "errors" "github.com/m3db/m3/src/m3ninx/index" + "github.com/m3db/m3/src/m3ninx/postings" ) var ( @@ -88,6 +89,9 @@ type MutableSegment interface { Segment index.Writer + // Reset resets the mutable segment for reuse. + Reset(offset postings.ID) + // Seal marks the Mutable Segment immutable. Seal() (Segment, error) diff --git a/src/m3ninx/persist/reader.go b/src/m3ninx/persist/reader.go index dfcfc28d97..583877ca5b 100644 --- a/src/m3ninx/persist/reader.go +++ b/src/m3ninx/persist/reader.go @@ -21,11 +21,14 @@ package persist import ( + "bytes" "fmt" "io" + "os" "github.com/m3db/m3/src/m3ninx/index/segment/fst" "github.com/m3db/m3/src/m3ninx/x" + "github.com/m3db/m3/src/x/mmap" ) // NewSegment returns a new fst.Segment backed by the provided fileset. @@ -116,3 +119,58 @@ func newSafeIndexSegmentFileSetCloser(fileset IndexSegmentFileSet) io.Closer { } return x.NewSafeMultiCloser(closers...) } + +// NewMmapedIndexSegmentFile returns an IndexSegmentFile backed by the provided bytes and FD. +// The returned object assumes ownership of the input fd, and mmap-ed bytes. +func NewMmapedIndexSegmentFile( + fileType IndexSegmentFileType, + fd *os.File, + bytesMmap []byte, +) IndexSegmentFile { + r := &readableIndexSegmentFileMmap{ + fileType: fileType, + fd: fd, + bytesMmap: bytesMmap, + } + r.reader.Reset(r.bytesMmap) + return r +} + +type readableIndexSegmentFileMmap struct { + fileType IndexSegmentFileType + fd *os.File + bytesMmap []byte + reader bytes.Reader +} + +var _ IndexSegmentFile = &readableIndexSegmentFileMmap{} + +func (f *readableIndexSegmentFileMmap) SegmentFileType() IndexSegmentFileType { + return f.fileType +} + +func (f *readableIndexSegmentFileMmap) Bytes() ([]byte, error) { + return f.bytesMmap, nil +} + +func (f *readableIndexSegmentFileMmap) Read(b []byte) (int, error) { + return f.reader.Read(b) +} + +func (f *readableIndexSegmentFileMmap) Close() error { + // Be sure to close the mmap before the file + if f.bytesMmap != nil { + if err := mmap.Munmap(f.bytesMmap); err != nil { + return err + } + f.bytesMmap = nil + } + if f.fd != nil { + if err := f.fd.Close(); err != nil { + return err + } + f.fd = nil + } + f.reader.Reset(nil) + return nil +}