From 1a3f709c14b56814ea2e6baef0a057434d0cbfb1 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 11:30:20 -0500 Subject: [PATCH 1/9] Create new config and wire it up --- src/cmd/services/m3dbnode/config/fs.go | 8 +++++++ src/dbnode/persist/fs/options.go | 32 ++++++++++++++++++++++++++ src/dbnode/server/server.go | 4 +++- 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/cmd/services/m3dbnode/config/fs.go b/src/cmd/services/m3dbnode/config/fs.go index 655866391d..8d74601efe 100644 --- a/src/cmd/services/m3dbnode/config/fs.go +++ b/src/cmd/services/m3dbnode/config/fs.go @@ -75,6 +75,14 @@ type FilesystemConfiguration struct { // Mmap is the mmap options which features are primarily platform dependent Mmap *MmapConfiguration `yaml:"mmap"` + + // ForceIndexSummariesMmapMemory forces the mmap that stores the index lookup bytes + // to be an anonymous region in memory as opposed to a file-based mmap. + ForceIndexSummariesMmapMemory bool `yaml:"force_index_summaries_mmap_memory"` + + // ForceBloomFilterMmapMemory forces the mmap that stores the index lookup bytes + // to be an anonymous region in memory as opposed to a file-based mmap. + ForceBloomFilterMmapMemory bool `yaml:"force_bloom_filter_mmap_memory"` } // MmapConfiguration is the mmap configuration. diff --git a/src/dbnode/persist/fs/options.go b/src/dbnode/persist/fs/options.go index ee9ac3d476..366cc4583c 100644 --- a/src/dbnode/persist/fs/options.go +++ b/src/dbnode/persist/fs/options.go @@ -58,6 +58,14 @@ const ( // defaultMmapHugePagesThreshold is the default threshold for when to enable huge pages if enabled defaultMmapHugePagesThreshold = 2 << 14 // 32kb (or when eclipsing 8 pages of default 4096 page size) + + // defaultForceIndexSummariesMmapMemory is the default configuration for whether the bytes for the index + // summaries file should be mmap'd as an anonymous region (force completely into memory) or mmap'd as a file. + defaultForceIndexSummariesMmapMemory = false + + // defaultForceIndexBloomFilterMmapMemory is the default configuration for whether the bytes for the bloom filter + // should be mmap'd as an anonymous region (forced completely into memory) or mmap'd as a file. + defaultForceIndexBloomFilterMmapMemory = false ) var ( @@ -79,6 +87,8 @@ type options struct { newDirectoryMode os.FileMode indexSummariesPercent float64 indexBloomFilterFalsePositivePercent float64 + forceIndexSummariesMmapMemory bool + forceBloomFilterMmapMemory bool writerBufferSize int dataReaderBufferSize int infoReaderBufferSize int @@ -110,6 +120,8 @@ func NewOptions() Options { newDirectoryMode: defaultNewDirectoryMode, indexSummariesPercent: defaultIndexSummariesPercent, indexBloomFilterFalsePositivePercent: defaultIndexBloomFilterFalsePositivePercent, + forceIndexSummariesMmapMemory: defaultForceIndexSummariesMmapMemory, + forceBloomFilterMmapMemory: defaultForceIndexBloomFilterMmapMemory, writerBufferSize: defaultWriterBufferSize, dataReaderBufferSize: defaultDataReaderBufferSize, infoReaderBufferSize: defaultInfoReaderBufferSize, @@ -232,6 +244,26 @@ func (o *options) IndexBloomFilterFalsePositivePercent() float64 { return o.indexBloomFilterFalsePositivePercent } +func (o *options) SetForceIndexSummariesMmapMemory(value bool) Options { + opts := *o + opts.forceIndexSummariesMmapMemory = value + return &opts +} + +func (o *options) ForceIndexSummariesMmapMemory() bool { + return o.forceIndexSummariesMmapMemory +} + +func (o *options) SetForceBloomFilterMmapMemory(value bool) Options { + opts := *o + opts.forceBloomFilterMmapMemory = value + return &opts +} + +func (o *options) ForceBloomFilterMmapMemory() bool { + return o.forceBloomFilterMmapMemory +} + func (o *options) SetWriterBufferSize(value int) Options { opts := *o opts.writerBufferSize = value diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 6e5ee5f0ee..0297209dfe 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -312,7 +312,9 @@ func Run(runOpts RunOptions) { SetMmapHugeTLBThreshold(mmapCfg.HugeTLB.Threshold). SetRuntimeOptionsManager(runtimeOptsMgr). SetTagEncoderPool(tagEncoderPool). - SetTagDecoderPool(tagDecoderPool) + SetTagDecoderPool(tagDecoderPool). + SetForceIndexSummariesMmapMemory(cfg.Filesystem.ForceIndexSummariesMmapMemory). + SetForceBloomFilterMmapMemory(cfg.Filesystem.ForceBloomFilterMmapMemory) var commitLogQueueSize int specified := cfg.CommitLog.Queue.Size From bc3ca5a1fd43e1ba521c456af2d951dbcc21e310 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 11:31:16 -0500 Subject: [PATCH 2/9] Allow mmaping files in summaries lookup --- src/dbnode/persist/fs/index_lookup.go | 53 ++++++++++++------- .../persist/fs/index_lookup_prop_test.go | 11 ++-- src/dbnode/persist/fs/index_lookup_test.go | 2 + src/dbnode/persist/fs/seek.go | 1 + src/dbnode/persist/fs/types.go | 16 ++++++ 5 files changed, 62 insertions(+), 21 deletions(-) diff --git a/src/dbnode/persist/fs/index_lookup.go b/src/dbnode/persist/fs/index_lookup.go index ab60ab603e..750a64fca4 100644 --- a/src/dbnode/persist/fs/index_lookup.go +++ b/src/dbnode/persist/fs/index_lookup.go @@ -56,7 +56,7 @@ func newNearestIndexOffsetLookup( summaryIDsOffsets: summaryIDsOffsets, summariesMmap: summariesMmap, decoderStream: decoderStream, - msgpackDecoder: msgpack.NewDecoder(nil), + msgpackDecoder: msgpack.NewDecoder(decoderStream), isClone: false, } } @@ -66,11 +66,12 @@ func (il *nearestIndexOffsetLookup) concurrentClone() (*nearestIndexOffsetLookup return nil, errCloneShouldNotBeCloned } + decoderStream := xmsgpack.NewDecoderStream(nil) return &nearestIndexOffsetLookup{ summaryIDsOffsets: il.summaryIDsOffsets, summariesMmap: il.summariesMmap, - decoderStream: xmsgpack.NewDecoderStream(nil), - msgpackDecoder: msgpack.NewDecoder(nil), + decoderStream: decoderStream, + msgpackDecoder: msgpack.NewDecoder(decoderStream), isClone: true, }, nil } @@ -163,6 +164,7 @@ func newNearestIndexOffsetLookupFromSummariesFile( expectedSummariesDigest uint32, decoder *xmsgpack.Decoder, numEntries int, + forceMmapMemory bool, ) (*nearestIndexOffsetLookup, error) { summariesFd := summariesFdWithDigest.Fd() stat, err := summariesFd.Stat() @@ -171,22 +173,37 @@ func newNearestIndexOffsetLookupFromSummariesFile( } numBytes := stat.Size() - // Request an anonymous (non-file-backed) mmap region. Note that we're going - // to use the mmap'd region to store the read-only summaries data, but the mmap - // region itself needs to be writable so we can copy the bytes from disk - // into it - mmapResult, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) - if err != nil { - return nil, err - } - summariesMmap := mmapResult.Result + var summariesMmap []byte - // Validate the bytes on disk using the digest, and read them into - // the mmap'd region - _, err = summariesFdWithDigest.ReadAllAndValidate(summariesMmap, expectedSummariesDigest) - if err != nil { - mmap.Munmap(summariesMmap) - return nil, err + if forceMmapMemory { + // Request an anonymous (non-file-backed) mmap region. Note that we're going + // to use the mmap'd region to store the read-only summaries data, but the mmap + // region itself needs to be writable so we can copy the bytes from disk + // into it + mmapResult, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) + if err != nil { + return nil, err + } + summariesMmap = mmapResult.Result + + // Validate the bytes on disk using the digest, and read them into + // the mmap'd region + _, err = summariesFdWithDigest.ReadAllAndValidate(summariesMmap, expectedSummariesDigest) + if err != nil { + mmap.Munmap(summariesMmap) + return nil, err + } + } else { + mmapResult, err := mmap.File(summariesFdWithDigest.Fd(), mmap.Options{Read: true, Write: false}) + if err != nil { + return nil, err + } + summariesMmap = mmapResult.Result + if calculatedDigest := digest.Checksum(summariesMmap); calculatedDigest != expectedSummariesDigest { + mmap.Munmap(summariesMmap) + return nil, fmt.Errorf("expected summaries file digest was: %d, but got: %d", + expectedSummariesDigest, calculatedDigest) + } } // Msgpack decode the entire summaries file (we need to store the offsets diff --git a/src/dbnode/persist/fs/index_lookup_prop_test.go b/src/dbnode/persist/fs/index_lookup_prop_test.go index f03a257aaf..e7751bfd7e 100644 --- a/src/dbnode/persist/fs/index_lookup_prop_test.go +++ b/src/dbnode/persist/fs/index_lookup_prop_test.go @@ -118,7 +118,7 @@ func TestIndexLookupWriteRead(t *testing.T) { expectedSummariesDigest := calculateExpectedChecksum(t, summariesFilePath) decoder := msgpack.NewDecoder(options.DecodingOptions()) indexLookup, err := newNearestIndexOffsetLookupFromSummariesFile( - summariesFdWithDigest, expectedSummariesDigest, decoder, len(writes)) + summariesFdWithDigest, expectedSummariesDigest, decoder, len(writes), input.forceMmapMemory) if err != nil { return false, fmt.Errorf("err reading index lookup from summaries file: %v, ", err) } @@ -173,6 +173,9 @@ type propTestInput struct { realWrites []generatedWrite // Shard number to use for the files shard uint32 + // Whether the summaries file bytes should be mmap'd as an + // anonymous region or file. + forceMmapMemory bool } type generatedWrite struct { @@ -196,10 +199,12 @@ func genPropTestInput(numRealWrites int) gopter.Gen { return gopter.CombineGens( gen.SliceOfN(numRealWrites, genWrite()), gen.UInt32(), + gen.Bool(), ).Map(func(vals []interface{}) propTestInput { return propTestInput{ - realWrites: vals[0].([]generatedWrite), - shard: vals[1].(uint32), + realWrites: vals[0].([]generatedWrite), + shard: vals[1].(uint32), + forceMmapMemory: vals[2].(bool), } }) } diff --git a/src/dbnode/persist/fs/index_lookup_test.go b/src/dbnode/persist/fs/index_lookup_test.go index 63217c28f3..8550c5fac6 100644 --- a/src/dbnode/persist/fs/index_lookup_test.go +++ b/src/dbnode/persist/fs/index_lookup_test.go @@ -79,6 +79,7 @@ func TestNewNearestIndexOffsetDetectsUnsortedFiles(t *testing.T) { expectedDigest, msgpack.NewDecoder(nil), len(outOfOrderSummaries), + false, ) expectedErr := fmt.Errorf("summaries file is not sorted: %s", file.Name()) require.Equal(t, expectedErr, err) @@ -211,6 +212,7 @@ func newIndexLookupWithSummaries(t *testing.T, indexSummaries []schema.IndexSumm expectedDigest, msgpack.NewDecoder(nil), len(indexSummaries), + false, ) require.NoError(t, err) return indexLookup diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index b80ffcc6ef..ef481fb90c 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -272,6 +272,7 @@ func (s *seeker) Open(namespace ident.ID, shard uint32, blockStart time.Time) er expectedDigests.summariesDigest, s.decoder, int(s.summariesInfo.Summaries), + s.opts.opts.ForceIndexSummariesMmapMemory(), ) if err != nil { s.Close() diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 62fe4a1c53..b53077a0c4 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -374,6 +374,22 @@ type Options interface { // rate to use for the index bloom filter size and k hashes estimation IndexBloomFilterFalsePositivePercent() float64 + // SetForceIndexSummariesMmapMemory sets whether the summaries files will be mmap'd + // as an anonymous region, or as a file. + SetForceIndexSummariesMmapMemory(value bool) Options + + // ForceIndexSummariesMmapMemory returns whether the summaries files will be mmap'd + // as an anonymous region, or as a file. + ForceIndexSummariesMmapMemory() bool + + // SetForceBloomFilterMmapMemory sets whether the bloom filters will be mmap'd + // as an anonymous region, or as a file. + SetForceBloomFilterMmapMemory(value bool) Options + + // ForceBloomFilterMmapMemory returns whether the bloom filters will be mmap'd + // as an anonymous region, or as a file. + ForceBloomFilterMmapMemory() bool + // SetWriterBufferSize sets the buffer size for writing TSDB files SetWriterBufferSize(value int) Options From 6e317a31b957ca1336ad79d9158e9af09a5680b5 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 11:33:55 -0500 Subject: [PATCH 3/9] improve coverage --- src/dbnode/persist/fs/index_lookup_test.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/dbnode/persist/fs/index_lookup_test.go b/src/dbnode/persist/fs/index_lookup_test.go index 8550c5fac6..7e6b0a181c 100644 --- a/src/dbnode/persist/fs/index_lookup_test.go +++ b/src/dbnode/persist/fs/index_lookup_test.go @@ -110,7 +110,7 @@ func TestClosingCloneDoesNotAffectParent(t *testing.T) { }, } - indexLookup := newIndexLookupWithSummaries(t, indexSummaries) + indexLookup := newIndexLookupWithSummaries(t, indexSummaries, false) clone, err := indexLookup.concurrentClone() require.NoError(t, err) require.NoError(t, clone.close()) @@ -126,6 +126,14 @@ func TestClosingCloneDoesNotAffectParent(t *testing.T) { } func TestParentAndClonesSafeForConcurrentUse(t *testing.T) { + testParentAndClonesSafeForConcurrentUse(t, false) +} + +func TestParentAndClonesSafeForConcurrentUseForceMmapMemory(t *testing.T) { + testParentAndClonesSafeForConcurrentUse(t, true) +} + +func testParentAndClonesSafeForConcurrentUse(t *testing.T, forceMmapMemory bool) { numSummaries := 1000 numClones := 10 @@ -141,7 +149,7 @@ func TestParentAndClonesSafeForConcurrentUse(t *testing.T) { sort.Sort(sortableSummaries(indexSummaries)) // Create indexLookup and associated clones - indexLookup := newIndexLookupWithSummaries(t, indexSummaries) + indexLookup := newIndexLookupWithSummaries(t, indexSummaries, forceMmapMemory) clones := []*nearestIndexOffsetLookup{} for i := 0; i < numClones; i++ { clone, err := indexLookup.concurrentClone() @@ -185,7 +193,8 @@ func TestParentAndClonesSafeForConcurrentUse(t *testing.T) { // newIndexLookupWithSummaries will return a new index lookup that is backed by the provided // indexSummaries (in the order that they are provided). -func newIndexLookupWithSummaries(t *testing.T, indexSummaries []schema.IndexSummary) *nearestIndexOffsetLookup { +func newIndexLookupWithSummaries( + t *testing.T, indexSummaries []schema.IndexSummary, forceMmapMemory bool) *nearestIndexOffsetLookup { // Create a temp file file, err := ioutil.TempFile("", "index-lookup-sort") require.NoError(t, err) @@ -212,7 +221,7 @@ func newIndexLookupWithSummaries(t *testing.T, indexSummaries []schema.IndexSumm expectedDigest, msgpack.NewDecoder(nil), len(indexSummaries), - false, + forceMmapMemory, ) require.NoError(t, err) return indexLookup From d1f6beef901fb0dda02e1628836e07bbc03d0c78 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 11:39:31 -0500 Subject: [PATCH 4/9] make bloom filter mmap configurable --- src/dbnode/persist/fs/bloom_filter.go | 52 +++++++++++++++++---------- src/dbnode/persist/fs/read.go | 1 + src/dbnode/persist/fs/seek.go | 1 + 3 files changed, 36 insertions(+), 18 deletions(-) diff --git a/src/dbnode/persist/fs/bloom_filter.go b/src/dbnode/persist/fs/bloom_filter.go index d27c13789f..f030a2b487 100644 --- a/src/dbnode/persist/fs/bloom_filter.go +++ b/src/dbnode/persist/fs/bloom_filter.go @@ -21,6 +21,7 @@ package fs import ( + "fmt" "os" "github.com/m3db/bloom" @@ -73,6 +74,7 @@ func newManagedConcurrentBloomFilterFromFile( expectedDigest uint32, numElementsM uint, numHashesK uint, + forceMmapMemory bool, ) (*ManagedConcurrentBloomFilter, error) { // Determine how many bytes to request for the mmap'd region bloomFilterFdWithDigest.Reset(bloomFilterFd) @@ -82,25 +84,39 @@ func newManagedConcurrentBloomFilterFromFile( } numBytes := stat.Size() - // Request an anonymous (non-file-backed) mmap region. Note that we're going - // to use the mmap'd region to create a read-only bloom filter, but the mmap - // region itself needs to be writable so we can copy the bytes from disk - // into it - result, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) - if err != nil { - return nil, err - } - anonMmap := result.Result + var mmapBytes []byte + if forceMmapMemory { + // Request an anonymous (non-file-backed) mmap region. Note that we're going + // to use the mmap'd region to create a read-only bloom filter, but the mmap + // region itself needs to be writable so we can copy the bytes from disk + // into it + result, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) + if err != nil { + return nil, err + } + mmapBytes = result.Result - // Validate the bytes on disk using the digest, and read them into - // the mmap'd region - _, err = bloomFilterFdWithDigest.ReadAllAndValidate( - anonMmap, expectedDigest) - if err != nil { - mmap.Munmap(anonMmap) - return nil, err + // Validate the bytes on disk using the digest, and read them into + // the mmap'd region + _, err = bloomFilterFdWithDigest.ReadAllAndValidate(mmapBytes, expectedDigest) + if err != nil { + mmap.Munmap(mmapBytes) + return nil, err + } + } else { + mmapResult, err := mmap.File(bloomFilterFdWithDigest.Fd(), mmap.Options{Read: true, Write: false}) + if err != nil { + return nil, err + } + + mmapBytes = mmapResult.Result + if calculatedDigest := digest.Checksum(mmapBytes); calculatedDigest != expectedDigest { + mmap.Munmap(mmapBytes) + return nil, fmt.Errorf("expected summaries file digest was: %d, but got: %d", + expectedDigest, calculatedDigest) + } } - bloomFilter := bloom.NewConcurrentReadOnlyBloomFilter(numElementsM, numHashesK, anonMmap) - return newManagedConcurrentBloomFilter(bloomFilter, anonMmap), nil + bloomFilter := bloom.NewConcurrentReadOnlyBloomFilter(numElementsM, numHashesK, mmapBytes) + return newManagedConcurrentBloomFilter(bloomFilter, mmapBytes), nil } diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 558c4d738e..efc3edc0ca 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -369,6 +369,7 @@ func (r *reader) ReadBloomFilter() (*ManagedConcurrentBloomFilter, error) { r.expectedBloomFilterDigest, uint(r.bloomFilterInfo.NumElementsM), uint(r.bloomFilterInfo.NumHashesK), + r.opts.ForceBloomFilterMmapMemory(), ) } diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index ef481fb90c..e5b5c4d855 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -260,6 +260,7 @@ func (s *seeker) Open(namespace ident.ID, shard uint32, blockStart time.Time) er expectedDigests.bloomFilterDigest, uint(s.bloomFilterInfo.NumElementsM), uint(s.bloomFilterInfo.NumHashesK), + s.opts.opts.ForceBloomFilterMmapMemory(), ) if err != nil { s.Close() From f33f93185646892fd077f69bce81b6c4017d34cd Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 11:49:08 -0500 Subject: [PATCH 5/9] reuse mmap code --- src/dbnode/persist/fs/bloom_filter.go | 42 ++---------- src/dbnode/persist/fs/index_lookup.go | 41 +----------- src/dbnode/persist/fs/mmap_util.go | 95 +++++++++++++++++++++++++++ 3 files changed, 102 insertions(+), 76 deletions(-) create mode 100644 src/dbnode/persist/fs/mmap_util.go diff --git a/src/dbnode/persist/fs/bloom_filter.go b/src/dbnode/persist/fs/bloom_filter.go index f030a2b487..c171facc81 100644 --- a/src/dbnode/persist/fs/bloom_filter.go +++ b/src/dbnode/persist/fs/bloom_filter.go @@ -21,7 +21,6 @@ package fs import ( - "fmt" "os" "github.com/m3db/bloom" @@ -78,45 +77,12 @@ func newManagedConcurrentBloomFilterFromFile( ) (*ManagedConcurrentBloomFilter, error) { // Determine how many bytes to request for the mmap'd region bloomFilterFdWithDigest.Reset(bloomFilterFd) - stat, err := bloomFilterFd.Stat() + + bloomFilterMmap, err := validateAndMmap(bloomFilterFdWithDigest, expectedDigest, forceMmapMemory) if err != nil { return nil, err } - numBytes := stat.Size() - - var mmapBytes []byte - if forceMmapMemory { - // Request an anonymous (non-file-backed) mmap region. Note that we're going - // to use the mmap'd region to create a read-only bloom filter, but the mmap - // region itself needs to be writable so we can copy the bytes from disk - // into it - result, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) - if err != nil { - return nil, err - } - mmapBytes = result.Result - - // Validate the bytes on disk using the digest, and read them into - // the mmap'd region - _, err = bloomFilterFdWithDigest.ReadAllAndValidate(mmapBytes, expectedDigest) - if err != nil { - mmap.Munmap(mmapBytes) - return nil, err - } - } else { - mmapResult, err := mmap.File(bloomFilterFdWithDigest.Fd(), mmap.Options{Read: true, Write: false}) - if err != nil { - return nil, err - } - - mmapBytes = mmapResult.Result - if calculatedDigest := digest.Checksum(mmapBytes); calculatedDigest != expectedDigest { - mmap.Munmap(mmapBytes) - return nil, fmt.Errorf("expected summaries file digest was: %d, but got: %d", - expectedDigest, calculatedDigest) - } - } - bloomFilter := bloom.NewConcurrentReadOnlyBloomFilter(numElementsM, numHashesK, mmapBytes) - return newManagedConcurrentBloomFilter(bloomFilter, mmapBytes), nil + bloomFilter := bloom.NewConcurrentReadOnlyBloomFilter(numElementsM, numHashesK, bloomFilterMmap) + return newManagedConcurrentBloomFilter(bloomFilter, bloomFilterMmap), nil } diff --git a/src/dbnode/persist/fs/index_lookup.go b/src/dbnode/persist/fs/index_lookup.go index 750a64fca4..5042df0dae 100644 --- a/src/dbnode/persist/fs/index_lookup.go +++ b/src/dbnode/persist/fs/index_lookup.go @@ -161,50 +161,15 @@ func (il *nearestIndexOffsetLookup) close() error { // the summaries file is sorted (which it always should be). func newNearestIndexOffsetLookupFromSummariesFile( summariesFdWithDigest digest.FdWithDigestReader, - expectedSummariesDigest uint32, + expectedDigest uint32, decoder *xmsgpack.Decoder, numEntries int, forceMmapMemory bool, ) (*nearestIndexOffsetLookup, error) { - summariesFd := summariesFdWithDigest.Fd() - stat, err := summariesFd.Stat() + summariesMmap, err := validateAndMmap(summariesFdWithDigest, expectedDigest, forceMmapMemory) if err != nil { return nil, err } - numBytes := stat.Size() - - var summariesMmap []byte - - if forceMmapMemory { - // Request an anonymous (non-file-backed) mmap region. Note that we're going - // to use the mmap'd region to store the read-only summaries data, but the mmap - // region itself needs to be writable so we can copy the bytes from disk - // into it - mmapResult, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) - if err != nil { - return nil, err - } - summariesMmap = mmapResult.Result - - // Validate the bytes on disk using the digest, and read them into - // the mmap'd region - _, err = summariesFdWithDigest.ReadAllAndValidate(summariesMmap, expectedSummariesDigest) - if err != nil { - mmap.Munmap(summariesMmap) - return nil, err - } - } else { - mmapResult, err := mmap.File(summariesFdWithDigest.Fd(), mmap.Options{Read: true, Write: false}) - if err != nil { - return nil, err - } - summariesMmap = mmapResult.Result - if calculatedDigest := digest.Checksum(summariesMmap); calculatedDigest != expectedSummariesDigest { - mmap.Munmap(summariesMmap) - return nil, fmt.Errorf("expected summaries file digest was: %d, but got: %d", - expectedSummariesDigest, calculatedDigest) - } - } // Msgpack decode the entire summaries file (we need to store the offsets // for the entries so we can binary-search it) @@ -227,7 +192,7 @@ func newNearestIndexOffsetLookupFromSummariesFile( // if they're not. This should never happen as files should be sorted on disk. if lastReadID != nil && bytes.Compare(lastReadID, entry.ID) != -1 { mmap.Munmap(summariesMmap) - return nil, fmt.Errorf("summaries file is not sorted: %s", summariesFd.Name()) + return nil, fmt.Errorf("summaries file is not sorted: %s", summariesFdWithDigest.Fd().Name()) } summaryTokens = append(summaryTokens, summaryToken) lastReadID = entry.ID diff --git a/src/dbnode/persist/fs/mmap_util.go b/src/dbnode/persist/fs/mmap_util.go new file mode 100644 index 0000000000..15f7ec6e5c --- /dev/null +++ b/src/dbnode/persist/fs/mmap_util.go @@ -0,0 +1,95 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "fmt" + + "github.com/m3db/m3/src/dbnode/digest" + "github.com/m3db/m3/src/x/mmap" +) + +func validateAndMmap( + fdWithDigest digest.FdWithDigestReader, + expectedDigest uint32, + forceMmapMemory bool, +) ([]byte, error) { + + if forceMmapMemory { + return validateAndMmapMemory(fdWithDigest, expectedDigest) + } + + return validateAndMmapFile(fdWithDigest, expectedDigest) + +} + +func validateAndMmapMemory( + fdWithDigest digest.FdWithDigestReader, + expectedDigest uint32, +) ([]byte, error) { + fd := fdWithDigest.Fd() + stat, err := fd.Stat() + if err != nil { + return nil, err + } + numBytes := stat.Size() + + // Request an anonymous (non-file-backed) mmap region. Note that we're going + // to use the mmap'd region to store the read-only summaries data, but the mmap + // region itself needs to be writable so we can copy the bytes from disk + // into it. + mmapResult, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) + if err != nil { + return nil, err + } + + mmapBytes := mmapResult.Result + + // Validate the bytes on disk using the digest, and read them into + // the mmap'd region + _, err = fdWithDigest.ReadAllAndValidate(mmapBytes, expectedDigest) + if err != nil { + mmap.Munmap(mmapBytes) + return nil, err + } + + return mmapBytes, nil +} + +func validateAndMmapFile( + fdWithDigest digest.FdWithDigestReader, + expectedDigest uint32, +) ([]byte, error) { + fd := fdWithDigest.Fd() + mmapResult, err := mmap.File(fd, mmap.Options{Read: true, Write: false}) + if err != nil { + return nil, err + } + + mmapBytes := mmapResult.Result + if calculatedDigest := digest.Checksum(mmapBytes); calculatedDigest != expectedDigest { + mmap.Munmap(mmapBytes) + return nil, fmt.Errorf("expected summaries file digest was: %d, but got: %d", + expectedDigest, calculatedDigest) + } + + return mmapBytes, nil +} From 496a30652de3b7799f1a846476d3319145d53e8b Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 11:53:09 -0500 Subject: [PATCH 6/9] fix typo --- src/dbnode/persist/fs/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dbnode/persist/fs/options.go b/src/dbnode/persist/fs/options.go index 366cc4583c..776e245b5b 100644 --- a/src/dbnode/persist/fs/options.go +++ b/src/dbnode/persist/fs/options.go @@ -60,7 +60,7 @@ const ( defaultMmapHugePagesThreshold = 2 << 14 // 32kb (or when eclipsing 8 pages of default 4096 page size) // defaultForceIndexSummariesMmapMemory is the default configuration for whether the bytes for the index - // summaries file should be mmap'd as an anonymous region (force completely into memory) or mmap'd as a file. + // summaries file should be mmap'd as an anonymous region (forced completely into memory) or mmap'd as a file. defaultForceIndexSummariesMmapMemory = false // defaultForceIndexBloomFilterMmapMemory is the default configuration for whether the bytes for the bloom filter From 208bcdff03967e2256a2e1424dbf2cd700bd5677 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 13:16:27 -0500 Subject: [PATCH 7/9] Add regression test for DecoderStream --- src/dbnode/persist/fs/msgpack/stream_test.go | 65 ++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/src/dbnode/persist/fs/msgpack/stream_test.go b/src/dbnode/persist/fs/msgpack/stream_test.go index 1663985144..86e6040081 100644 --- a/src/dbnode/persist/fs/msgpack/stream_test.go +++ b/src/dbnode/persist/fs/msgpack/stream_test.go @@ -21,12 +21,17 @@ package msgpack import ( + "bytes" "fmt" "io" + "runtime" "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + msgpacklib "gopkg.in/vmihailenco/msgpack.v2" ) // Call Read to accumulate the text of a file @@ -163,3 +168,63 @@ func TestDecoderStreamUnreadByteMultiple(t *testing.T) { } } } + +type bufioWrapCheckReaderNotImpl struct { + wasWrappedByBufio bool +} + +func (f *bufioWrapCheckReaderNotImpl) Read(p []byte) (int, error) { + buf := make([]byte, 100000) + n := runtime.Stack(buf, false) + if n == 0 { + panic("runtime.Stack did not write anything") + } + if bytes.Contains(buf, []byte("bufio")) { + f.wasWrappedByBufio = true + } + + return 0, nil +} + +type bufioWrapCheckReader struct { + decoderStream + wasWrappedByBufio bool +} + +func (f *bufioWrapCheckReader) Read(p []byte) (int, error) { + buf := make([]byte, 100000) + n := runtime.Stack(buf, false) + if n == 0 { + panic("runtime.Stack did not write anything") + } + if bytes.Contains(buf, []byte("bufio")) { + f.wasWrappedByBufio = true + } + + return 0, nil +} + +// The underlying msgpack library that we use will attempt to wrap decoder streams +// that are passed to it that do not implement a specific interface in a bufio.Reader. +// This is wasteful for us because we're using mmap'd byte slices under the hood which +// which do not require buffered IO and each of the bufio.Readers() that the library +// allocates uses 4KiB of memory, and there can be hundreds of thousands of them. +// +// This test makes sure that our DecoderStream can be passed to the library without +// being wrapped by a bufio.Reader(). +func TestStreamCanBeUsedWithMsgpackLibraryNoBufio(t *testing.T) { + // First, make sure that we can actually detect if our reader gets + // wrapped in a bufio.Reader(). + wrapCheckNotImpl := &bufioWrapCheckReaderNotImpl{} + decoder := msgpacklib.NewDecoder(wrapCheckNotImpl) + decoder.DecodeArrayLen() + require.True(t, wrapCheckNotImpl.wasWrappedByBufio) + + // Next, make sure that anything implementing our DecoderStream + // interface won't get wrapped up in a bufio.Reader(). + wrapCheckImpl := &bufioWrapCheckReader{} + var _ DecoderStream = wrapCheckImpl // Make sure our fake one implements the iFace. + decoder = msgpacklib.NewDecoder(wrapCheckImpl) + decoder.DecodeArrayLen() + require.False(t, wrapCheckImpl.wasWrappedByBufio) +} From a57954a147660cedc1edae5d0c09906c27dc0cae Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 13:21:02 -0500 Subject: [PATCH 8/9] fix config test --- src/cmd/services/m3dbnode/config/config_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index ae2736c4f0..e4ad9d69da 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -110,6 +110,8 @@ db: seekReadBufferSize: 4096 throughputLimitMbps: 100.0 throughputCheckEvery: 128 + force_index_summaries_mmap_memory: true + force_bloom_filter_mmap_memory: true repair: enabled: false @@ -391,6 +393,8 @@ db: newFileMode: null newDirectoryMode: null mmap: null + force_index_summaries_mmap_memory: true + force_bloom_filter_mmap_memory: true commitlog: flushMaxBytes: 524288 flushEvery: 1s From 8f0a6fccdd589bac670320bdf56c80f5696ac238 Mon Sep 17 00:00:00 2001 From: Richard Artoul Date: Thu, 10 Jan 2019 13:28:01 -0500 Subject: [PATCH 9/9] fix imports --- src/dbnode/persist/fs/msgpack/stream_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/src/dbnode/persist/fs/msgpack/stream_test.go b/src/dbnode/persist/fs/msgpack/stream_test.go index 86e6040081..6a0df24a5a 100644 --- a/src/dbnode/persist/fs/msgpack/stream_test.go +++ b/src/dbnode/persist/fs/msgpack/stream_test.go @@ -30,7 +30,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - msgpacklib "gopkg.in/vmihailenco/msgpack.v2" )