diff --git a/src/cmd/services/m3dbnode/config/config_test.go b/src/cmd/services/m3dbnode/config/config_test.go index ae2736c4f0..e4ad9d69da 100644 --- a/src/cmd/services/m3dbnode/config/config_test.go +++ b/src/cmd/services/m3dbnode/config/config_test.go @@ -110,6 +110,8 @@ db: seekReadBufferSize: 4096 throughputLimitMbps: 100.0 throughputCheckEvery: 128 + force_index_summaries_mmap_memory: true + force_bloom_filter_mmap_memory: true repair: enabled: false @@ -391,6 +393,8 @@ db: newFileMode: null newDirectoryMode: null mmap: null + force_index_summaries_mmap_memory: true + force_bloom_filter_mmap_memory: true commitlog: flushMaxBytes: 524288 flushEvery: 1s diff --git a/src/cmd/services/m3dbnode/config/fs.go b/src/cmd/services/m3dbnode/config/fs.go index 655866391d..8d74601efe 100644 --- a/src/cmd/services/m3dbnode/config/fs.go +++ b/src/cmd/services/m3dbnode/config/fs.go @@ -75,6 +75,14 @@ type FilesystemConfiguration struct { // Mmap is the mmap options which features are primarily platform dependent Mmap *MmapConfiguration `yaml:"mmap"` + + // ForceIndexSummariesMmapMemory forces the mmap that stores the index lookup bytes + // to be an anonymous region in memory as opposed to a file-based mmap. + ForceIndexSummariesMmapMemory bool `yaml:"force_index_summaries_mmap_memory"` + + // ForceBloomFilterMmapMemory forces the mmap that stores the index lookup bytes + // to be an anonymous region in memory as opposed to a file-based mmap. + ForceBloomFilterMmapMemory bool `yaml:"force_bloom_filter_mmap_memory"` } // MmapConfiguration is the mmap configuration. diff --git a/src/dbnode/persist/fs/bloom_filter.go b/src/dbnode/persist/fs/bloom_filter.go index d27c13789f..c171facc81 100644 --- a/src/dbnode/persist/fs/bloom_filter.go +++ b/src/dbnode/persist/fs/bloom_filter.go @@ -73,34 +73,16 @@ func newManagedConcurrentBloomFilterFromFile( expectedDigest uint32, numElementsM uint, numHashesK uint, + forceMmapMemory bool, ) (*ManagedConcurrentBloomFilter, error) { // Determine how many bytes to request for the mmap'd region bloomFilterFdWithDigest.Reset(bloomFilterFd) - stat, err := bloomFilterFd.Stat() - if err != nil { - return nil, err - } - numBytes := stat.Size() - - // Request an anonymous (non-file-backed) mmap region. Note that we're going - // to use the mmap'd region to create a read-only bloom filter, but the mmap - // region itself needs to be writable so we can copy the bytes from disk - // into it - result, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) - if err != nil { - return nil, err - } - anonMmap := result.Result - // Validate the bytes on disk using the digest, and read them into - // the mmap'd region - _, err = bloomFilterFdWithDigest.ReadAllAndValidate( - anonMmap, expectedDigest) + bloomFilterMmap, err := validateAndMmap(bloomFilterFdWithDigest, expectedDigest, forceMmapMemory) if err != nil { - mmap.Munmap(anonMmap) return nil, err } - bloomFilter := bloom.NewConcurrentReadOnlyBloomFilter(numElementsM, numHashesK, anonMmap) - return newManagedConcurrentBloomFilter(bloomFilter, anonMmap), nil + bloomFilter := bloom.NewConcurrentReadOnlyBloomFilter(numElementsM, numHashesK, bloomFilterMmap) + return newManagedConcurrentBloomFilter(bloomFilter, bloomFilterMmap), nil } diff --git a/src/dbnode/persist/fs/index_lookup.go b/src/dbnode/persist/fs/index_lookup.go index ab60ab603e..5042df0dae 100644 --- a/src/dbnode/persist/fs/index_lookup.go +++ b/src/dbnode/persist/fs/index_lookup.go @@ -56,7 +56,7 @@ func newNearestIndexOffsetLookup( summaryIDsOffsets: summaryIDsOffsets, summariesMmap: summariesMmap, decoderStream: decoderStream, - msgpackDecoder: msgpack.NewDecoder(nil), + msgpackDecoder: msgpack.NewDecoder(decoderStream), isClone: false, } } @@ -66,11 +66,12 @@ func (il *nearestIndexOffsetLookup) concurrentClone() (*nearestIndexOffsetLookup return nil, errCloneShouldNotBeCloned } + decoderStream := xmsgpack.NewDecoderStream(nil) return &nearestIndexOffsetLookup{ summaryIDsOffsets: il.summaryIDsOffsets, summariesMmap: il.summariesMmap, - decoderStream: xmsgpack.NewDecoderStream(nil), - msgpackDecoder: msgpack.NewDecoder(nil), + decoderStream: decoderStream, + msgpackDecoder: msgpack.NewDecoder(decoderStream), isClone: true, }, nil } @@ -160,32 +161,13 @@ func (il *nearestIndexOffsetLookup) close() error { // the summaries file is sorted (which it always should be). func newNearestIndexOffsetLookupFromSummariesFile( summariesFdWithDigest digest.FdWithDigestReader, - expectedSummariesDigest uint32, + expectedDigest uint32, decoder *xmsgpack.Decoder, numEntries int, + forceMmapMemory bool, ) (*nearestIndexOffsetLookup, error) { - summariesFd := summariesFdWithDigest.Fd() - stat, err := summariesFd.Stat() - if err != nil { - return nil, err - } - numBytes := stat.Size() - - // Request an anonymous (non-file-backed) mmap region. Note that we're going - // to use the mmap'd region to store the read-only summaries data, but the mmap - // region itself needs to be writable so we can copy the bytes from disk - // into it - mmapResult, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) - if err != nil { - return nil, err - } - summariesMmap := mmapResult.Result - - // Validate the bytes on disk using the digest, and read them into - // the mmap'd region - _, err = summariesFdWithDigest.ReadAllAndValidate(summariesMmap, expectedSummariesDigest) + summariesMmap, err := validateAndMmap(summariesFdWithDigest, expectedDigest, forceMmapMemory) if err != nil { - mmap.Munmap(summariesMmap) return nil, err } @@ -210,7 +192,7 @@ func newNearestIndexOffsetLookupFromSummariesFile( // if they're not. This should never happen as files should be sorted on disk. if lastReadID != nil && bytes.Compare(lastReadID, entry.ID) != -1 { mmap.Munmap(summariesMmap) - return nil, fmt.Errorf("summaries file is not sorted: %s", summariesFd.Name()) + return nil, fmt.Errorf("summaries file is not sorted: %s", summariesFdWithDigest.Fd().Name()) } summaryTokens = append(summaryTokens, summaryToken) lastReadID = entry.ID diff --git a/src/dbnode/persist/fs/index_lookup_prop_test.go b/src/dbnode/persist/fs/index_lookup_prop_test.go index f03a257aaf..e7751bfd7e 100644 --- a/src/dbnode/persist/fs/index_lookup_prop_test.go +++ b/src/dbnode/persist/fs/index_lookup_prop_test.go @@ -118,7 +118,7 @@ func TestIndexLookupWriteRead(t *testing.T) { expectedSummariesDigest := calculateExpectedChecksum(t, summariesFilePath) decoder := msgpack.NewDecoder(options.DecodingOptions()) indexLookup, err := newNearestIndexOffsetLookupFromSummariesFile( - summariesFdWithDigest, expectedSummariesDigest, decoder, len(writes)) + summariesFdWithDigest, expectedSummariesDigest, decoder, len(writes), input.forceMmapMemory) if err != nil { return false, fmt.Errorf("err reading index lookup from summaries file: %v, ", err) } @@ -173,6 +173,9 @@ type propTestInput struct { realWrites []generatedWrite // Shard number to use for the files shard uint32 + // Whether the summaries file bytes should be mmap'd as an + // anonymous region or file. + forceMmapMemory bool } type generatedWrite struct { @@ -196,10 +199,12 @@ func genPropTestInput(numRealWrites int) gopter.Gen { return gopter.CombineGens( gen.SliceOfN(numRealWrites, genWrite()), gen.UInt32(), + gen.Bool(), ).Map(func(vals []interface{}) propTestInput { return propTestInput{ - realWrites: vals[0].([]generatedWrite), - shard: vals[1].(uint32), + realWrites: vals[0].([]generatedWrite), + shard: vals[1].(uint32), + forceMmapMemory: vals[2].(bool), } }) } diff --git a/src/dbnode/persist/fs/index_lookup_test.go b/src/dbnode/persist/fs/index_lookup_test.go index 63217c28f3..7e6b0a181c 100644 --- a/src/dbnode/persist/fs/index_lookup_test.go +++ b/src/dbnode/persist/fs/index_lookup_test.go @@ -79,6 +79,7 @@ func TestNewNearestIndexOffsetDetectsUnsortedFiles(t *testing.T) { expectedDigest, msgpack.NewDecoder(nil), len(outOfOrderSummaries), + false, ) expectedErr := fmt.Errorf("summaries file is not sorted: %s", file.Name()) require.Equal(t, expectedErr, err) @@ -109,7 +110,7 @@ func TestClosingCloneDoesNotAffectParent(t *testing.T) { }, } - indexLookup := newIndexLookupWithSummaries(t, indexSummaries) + indexLookup := newIndexLookupWithSummaries(t, indexSummaries, false) clone, err := indexLookup.concurrentClone() require.NoError(t, err) require.NoError(t, clone.close()) @@ -125,6 +126,14 @@ func TestClosingCloneDoesNotAffectParent(t *testing.T) { } func TestParentAndClonesSafeForConcurrentUse(t *testing.T) { + testParentAndClonesSafeForConcurrentUse(t, false) +} + +func TestParentAndClonesSafeForConcurrentUseForceMmapMemory(t *testing.T) { + testParentAndClonesSafeForConcurrentUse(t, true) +} + +func testParentAndClonesSafeForConcurrentUse(t *testing.T, forceMmapMemory bool) { numSummaries := 1000 numClones := 10 @@ -140,7 +149,7 @@ func TestParentAndClonesSafeForConcurrentUse(t *testing.T) { sort.Sort(sortableSummaries(indexSummaries)) // Create indexLookup and associated clones - indexLookup := newIndexLookupWithSummaries(t, indexSummaries) + indexLookup := newIndexLookupWithSummaries(t, indexSummaries, forceMmapMemory) clones := []*nearestIndexOffsetLookup{} for i := 0; i < numClones; i++ { clone, err := indexLookup.concurrentClone() @@ -184,7 +193,8 @@ func TestParentAndClonesSafeForConcurrentUse(t *testing.T) { // newIndexLookupWithSummaries will return a new index lookup that is backed by the provided // indexSummaries (in the order that they are provided). -func newIndexLookupWithSummaries(t *testing.T, indexSummaries []schema.IndexSummary) *nearestIndexOffsetLookup { +func newIndexLookupWithSummaries( + t *testing.T, indexSummaries []schema.IndexSummary, forceMmapMemory bool) *nearestIndexOffsetLookup { // Create a temp file file, err := ioutil.TempFile("", "index-lookup-sort") require.NoError(t, err) @@ -211,6 +221,7 @@ func newIndexLookupWithSummaries(t *testing.T, indexSummaries []schema.IndexSumm expectedDigest, msgpack.NewDecoder(nil), len(indexSummaries), + forceMmapMemory, ) require.NoError(t, err) return indexLookup diff --git a/src/dbnode/persist/fs/mmap_util.go b/src/dbnode/persist/fs/mmap_util.go new file mode 100644 index 0000000000..15f7ec6e5c --- /dev/null +++ b/src/dbnode/persist/fs/mmap_util.go @@ -0,0 +1,95 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package fs + +import ( + "fmt" + + "github.com/m3db/m3/src/dbnode/digest" + "github.com/m3db/m3/src/x/mmap" +) + +func validateAndMmap( + fdWithDigest digest.FdWithDigestReader, + expectedDigest uint32, + forceMmapMemory bool, +) ([]byte, error) { + + if forceMmapMemory { + return validateAndMmapMemory(fdWithDigest, expectedDigest) + } + + return validateAndMmapFile(fdWithDigest, expectedDigest) + +} + +func validateAndMmapMemory( + fdWithDigest digest.FdWithDigestReader, + expectedDigest uint32, +) ([]byte, error) { + fd := fdWithDigest.Fd() + stat, err := fd.Stat() + if err != nil { + return nil, err + } + numBytes := stat.Size() + + // Request an anonymous (non-file-backed) mmap region. Note that we're going + // to use the mmap'd region to store the read-only summaries data, but the mmap + // region itself needs to be writable so we can copy the bytes from disk + // into it. + mmapResult, err := mmap.Bytes(numBytes, mmap.Options{Read: true, Write: true}) + if err != nil { + return nil, err + } + + mmapBytes := mmapResult.Result + + // Validate the bytes on disk using the digest, and read them into + // the mmap'd region + _, err = fdWithDigest.ReadAllAndValidate(mmapBytes, expectedDigest) + if err != nil { + mmap.Munmap(mmapBytes) + return nil, err + } + + return mmapBytes, nil +} + +func validateAndMmapFile( + fdWithDigest digest.FdWithDigestReader, + expectedDigest uint32, +) ([]byte, error) { + fd := fdWithDigest.Fd() + mmapResult, err := mmap.File(fd, mmap.Options{Read: true, Write: false}) + if err != nil { + return nil, err + } + + mmapBytes := mmapResult.Result + if calculatedDigest := digest.Checksum(mmapBytes); calculatedDigest != expectedDigest { + mmap.Munmap(mmapBytes) + return nil, fmt.Errorf("expected summaries file digest was: %d, but got: %d", + expectedDigest, calculatedDigest) + } + + return mmapBytes, nil +} diff --git a/src/dbnode/persist/fs/msgpack/stream_test.go b/src/dbnode/persist/fs/msgpack/stream_test.go index 1663985144..6a0df24a5a 100644 --- a/src/dbnode/persist/fs/msgpack/stream_test.go +++ b/src/dbnode/persist/fs/msgpack/stream_test.go @@ -21,12 +21,16 @@ package msgpack import ( + "bytes" "fmt" "io" + "runtime" "strings" "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + msgpacklib "gopkg.in/vmihailenco/msgpack.v2" ) // Call Read to accumulate the text of a file @@ -163,3 +167,63 @@ func TestDecoderStreamUnreadByteMultiple(t *testing.T) { } } } + +type bufioWrapCheckReaderNotImpl struct { + wasWrappedByBufio bool +} + +func (f *bufioWrapCheckReaderNotImpl) Read(p []byte) (int, error) { + buf := make([]byte, 100000) + n := runtime.Stack(buf, false) + if n == 0 { + panic("runtime.Stack did not write anything") + } + if bytes.Contains(buf, []byte("bufio")) { + f.wasWrappedByBufio = true + } + + return 0, nil +} + +type bufioWrapCheckReader struct { + decoderStream + wasWrappedByBufio bool +} + +func (f *bufioWrapCheckReader) Read(p []byte) (int, error) { + buf := make([]byte, 100000) + n := runtime.Stack(buf, false) + if n == 0 { + panic("runtime.Stack did not write anything") + } + if bytes.Contains(buf, []byte("bufio")) { + f.wasWrappedByBufio = true + } + + return 0, nil +} + +// The underlying msgpack library that we use will attempt to wrap decoder streams +// that are passed to it that do not implement a specific interface in a bufio.Reader. +// This is wasteful for us because we're using mmap'd byte slices under the hood which +// which do not require buffered IO and each of the bufio.Readers() that the library +// allocates uses 4KiB of memory, and there can be hundreds of thousands of them. +// +// This test makes sure that our DecoderStream can be passed to the library without +// being wrapped by a bufio.Reader(). +func TestStreamCanBeUsedWithMsgpackLibraryNoBufio(t *testing.T) { + // First, make sure that we can actually detect if our reader gets + // wrapped in a bufio.Reader(). + wrapCheckNotImpl := &bufioWrapCheckReaderNotImpl{} + decoder := msgpacklib.NewDecoder(wrapCheckNotImpl) + decoder.DecodeArrayLen() + require.True(t, wrapCheckNotImpl.wasWrappedByBufio) + + // Next, make sure that anything implementing our DecoderStream + // interface won't get wrapped up in a bufio.Reader(). + wrapCheckImpl := &bufioWrapCheckReader{} + var _ DecoderStream = wrapCheckImpl // Make sure our fake one implements the iFace. + decoder = msgpacklib.NewDecoder(wrapCheckImpl) + decoder.DecodeArrayLen() + require.False(t, wrapCheckImpl.wasWrappedByBufio) +} diff --git a/src/dbnode/persist/fs/options.go b/src/dbnode/persist/fs/options.go index ee9ac3d476..776e245b5b 100644 --- a/src/dbnode/persist/fs/options.go +++ b/src/dbnode/persist/fs/options.go @@ -58,6 +58,14 @@ const ( // defaultMmapHugePagesThreshold is the default threshold for when to enable huge pages if enabled defaultMmapHugePagesThreshold = 2 << 14 // 32kb (or when eclipsing 8 pages of default 4096 page size) + + // defaultForceIndexSummariesMmapMemory is the default configuration for whether the bytes for the index + // summaries file should be mmap'd as an anonymous region (forced completely into memory) or mmap'd as a file. + defaultForceIndexSummariesMmapMemory = false + + // defaultForceIndexBloomFilterMmapMemory is the default configuration for whether the bytes for the bloom filter + // should be mmap'd as an anonymous region (forced completely into memory) or mmap'd as a file. + defaultForceIndexBloomFilterMmapMemory = false ) var ( @@ -79,6 +87,8 @@ type options struct { newDirectoryMode os.FileMode indexSummariesPercent float64 indexBloomFilterFalsePositivePercent float64 + forceIndexSummariesMmapMemory bool + forceBloomFilterMmapMemory bool writerBufferSize int dataReaderBufferSize int infoReaderBufferSize int @@ -110,6 +120,8 @@ func NewOptions() Options { newDirectoryMode: defaultNewDirectoryMode, indexSummariesPercent: defaultIndexSummariesPercent, indexBloomFilterFalsePositivePercent: defaultIndexBloomFilterFalsePositivePercent, + forceIndexSummariesMmapMemory: defaultForceIndexSummariesMmapMemory, + forceBloomFilterMmapMemory: defaultForceIndexBloomFilterMmapMemory, writerBufferSize: defaultWriterBufferSize, dataReaderBufferSize: defaultDataReaderBufferSize, infoReaderBufferSize: defaultInfoReaderBufferSize, @@ -232,6 +244,26 @@ func (o *options) IndexBloomFilterFalsePositivePercent() float64 { return o.indexBloomFilterFalsePositivePercent } +func (o *options) SetForceIndexSummariesMmapMemory(value bool) Options { + opts := *o + opts.forceIndexSummariesMmapMemory = value + return &opts +} + +func (o *options) ForceIndexSummariesMmapMemory() bool { + return o.forceIndexSummariesMmapMemory +} + +func (o *options) SetForceBloomFilterMmapMemory(value bool) Options { + opts := *o + opts.forceBloomFilterMmapMemory = value + return &opts +} + +func (o *options) ForceBloomFilterMmapMemory() bool { + return o.forceBloomFilterMmapMemory +} + func (o *options) SetWriterBufferSize(value int) Options { opts := *o opts.writerBufferSize = value diff --git a/src/dbnode/persist/fs/read.go b/src/dbnode/persist/fs/read.go index 558c4d738e..efc3edc0ca 100644 --- a/src/dbnode/persist/fs/read.go +++ b/src/dbnode/persist/fs/read.go @@ -369,6 +369,7 @@ func (r *reader) ReadBloomFilter() (*ManagedConcurrentBloomFilter, error) { r.expectedBloomFilterDigest, uint(r.bloomFilterInfo.NumElementsM), uint(r.bloomFilterInfo.NumHashesK), + r.opts.ForceBloomFilterMmapMemory(), ) } diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index b80ffcc6ef..e5b5c4d855 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -260,6 +260,7 @@ func (s *seeker) Open(namespace ident.ID, shard uint32, blockStart time.Time) er expectedDigests.bloomFilterDigest, uint(s.bloomFilterInfo.NumElementsM), uint(s.bloomFilterInfo.NumHashesK), + s.opts.opts.ForceBloomFilterMmapMemory(), ) if err != nil { s.Close() @@ -272,6 +273,7 @@ func (s *seeker) Open(namespace ident.ID, shard uint32, blockStart time.Time) er expectedDigests.summariesDigest, s.decoder, int(s.summariesInfo.Summaries), + s.opts.opts.ForceIndexSummariesMmapMemory(), ) if err != nil { s.Close() diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 62fe4a1c53..b53077a0c4 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -374,6 +374,22 @@ type Options interface { // rate to use for the index bloom filter size and k hashes estimation IndexBloomFilterFalsePositivePercent() float64 + // SetForceIndexSummariesMmapMemory sets whether the summaries files will be mmap'd + // as an anonymous region, or as a file. + SetForceIndexSummariesMmapMemory(value bool) Options + + // ForceIndexSummariesMmapMemory returns whether the summaries files will be mmap'd + // as an anonymous region, or as a file. + ForceIndexSummariesMmapMemory() bool + + // SetForceBloomFilterMmapMemory sets whether the bloom filters will be mmap'd + // as an anonymous region, or as a file. + SetForceBloomFilterMmapMemory(value bool) Options + + // ForceBloomFilterMmapMemory returns whether the bloom filters will be mmap'd + // as an anonymous region, or as a file. + ForceBloomFilterMmapMemory() bool + // SetWriterBufferSize sets the buffer size for writing TSDB files SetWriterBufferSize(value int) Options diff --git a/src/dbnode/server/server.go b/src/dbnode/server/server.go index 6e5ee5f0ee..0297209dfe 100644 --- a/src/dbnode/server/server.go +++ b/src/dbnode/server/server.go @@ -312,7 +312,9 @@ func Run(runOpts RunOptions) { SetMmapHugeTLBThreshold(mmapCfg.HugeTLB.Threshold). SetRuntimeOptionsManager(runtimeOptsMgr). SetTagEncoderPool(tagEncoderPool). - SetTagDecoderPool(tagDecoderPool) + SetTagDecoderPool(tagDecoderPool). + SetForceIndexSummariesMmapMemory(cfg.Filesystem.ForceIndexSummariesMmapMemory). + SetForceBloomFilterMmapMemory(cfg.Filesystem.ForceBloomFilterMmapMemory) var commitLogQueueSize int specified := cfg.CommitLog.Queue.Size