diff --git a/glide.lock b/glide.lock index 0aabc287d8..59536492da 100644 --- a/glide.lock +++ b/glide.lock @@ -1,5 +1,5 @@ -hash: 88640e940000eacfdf8cce77ed94121794d32f16d40e1c52e37d93965677998f -updated: 2019-05-23T15:26:57.158244+02:00 +hash: 105536886b45a56d0e59ffc500254941bbe3d0ca9355e32bd94fb2d7a4deb9f8 +updated: 2019-06-12T15:04:52.880394+10:00 imports: - name: github.com/apache/thrift version: c2fb1c4e8c931d22617bebb0bf388cb4d5e6fcff @@ -11,6 +11,8 @@ imports: version: 4c0e84591b9aa9e6dcfdf3e020114cd81f89d5f9 subpackages: - quantile +- name: github.com/c2h5oh/datasize + version: 4eba002a5eaea69cf8d235a388fc6b65ae68d2dd - name: github.com/cespare/xxhash version: 48099fad606eafc26e3a569fad19ff510fff4df6 - name: github.com/cockroachdb/cmux diff --git a/glide.yaml b/glide.yaml index 1ad2779b78..85e76a3384 100644 --- a/glide.yaml +++ b/glide.yaml @@ -157,6 +157,9 @@ import: - package: github.com/hydrogen18/stalecucumber version: 9b38526d4bdf8e197c31344777fc28f7f48d250d + - package: github.com/c2h5oh/datasize + version: 4eba002a5eaea69cf8d235a388fc6b65ae68d2dd + # START_PROMETHEUS_DEPS - package: github.com/prometheus/prometheus version: 998dfcbac689ae832ea64ca134fcb096f61a7f62 diff --git a/src/dbnode/integration/commitlog_bootstrap_index_perf_speed_test.go b/src/dbnode/integration/commitlog_bootstrap_index_perf_speed_test.go new file mode 100644 index 0000000000..7d4a2dcf3e --- /dev/null +++ b/src/dbnode/integration/commitlog_bootstrap_index_perf_speed_test.go @@ -0,0 +1,221 @@ +// +build integration + +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package integration + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + "io/ioutil" + "math/rand" + "os" + "strconv" + "testing" + "time" + + "github.com/m3db/m3/src/dbnode/clock" + "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" + "github.com/m3db/m3/src/dbnode/retention" + "github.com/m3db/m3/src/dbnode/ts" + "github.com/m3db/m3/src/x/checked" + "github.com/m3db/m3/src/x/context" + "github.com/m3db/m3/src/x/ident" + xtime "github.com/m3db/m3/src/x/time" + + "github.com/c2h5oh/datasize" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +// TestCommitLogIndexPerfSpeedBootstrap tests the performance of the commit log +// bootstrapper when a large amount of data is present that needs to be read. +// Note: Also useful for doing adhoc testing, using the environment variables +// to turn up and down the number of IDs and points. +func TestCommitLogIndexPerfSpeedBootstrap(t *testing.T) { + if testing.Short() { + t.SkipNow() // Just skip if we're doing a short run + } + + // Test setup + var ( + rOpts = retention.NewOptions().SetRetentionPeriod(12 * time.Hour) + blockSize = rOpts.BlockSize() + ) + + nsOpts := namespace.NewOptions(). + SetRetentionOptions(rOpts). + SetIndexOptions(namespace.NewIndexOptions(). + SetEnabled(true). + SetBlockSize(2 * blockSize)) + ns, err := namespace.NewMetadata(testNamespaces[0], nsOpts) + require.NoError(t, err) + opts := newTestOptions(t). + SetNamespaces([]namespace.Metadata{ns}). + // Allow for wall clock timing + SetNowFn(time.Now) + + setup, err := newTestSetup(t, opts, nil) + require.NoError(t, err) + defer setup.close() + + commitLogOpts := setup.storageOpts.CommitLogOptions(). + SetFlushInterval(defaultIntegrationTestFlushInterval) + setup.storageOpts = setup.storageOpts.SetCommitLogOptions(commitLogOpts) + + log := setup.storageOpts.InstrumentOptions().Logger() + log.Info("commit log bootstrap test") + + // Write test data + log.Info("generating data") + + // NB(r): Use TEST_NUM_SERIES=50000 for a representative large data set to + // test loading locally + numSeries := 1024 + if str := os.Getenv("TEST_NUM_SERIES"); str != "" { + numSeries, err = strconv.Atoi(str) + require.NoError(t, err) + } + + step := time.Second + numPoints := 128 + if str := os.Getenv("TEST_NUM_POINTS"); str != "" { + numPoints, err = strconv.Atoi(str) + require.NoError(t, err) + } + + require.True(t, (time.Duration(numPoints)*step) < blockSize, + fmt.Sprintf("num points %d multiplied by step %s is greater than block size %s", + numPoints, step.String(), blockSize.String())) + + numTags := 8 + if str := os.Getenv("TEST_NUM_TAGS"); str != "" { + numTags, err = strconv.Atoi(str) + require.NoError(t, err) + } + + numTagSets := 128 + if str := os.Getenv("TEST_NUM_TAG_SETS"); str != "" { + numTagSets, err = strconv.Atoi(str) + require.NoError(t, err) + } + + // Pre-generate tag sets, but not too many to reduce heap size. + tagSets := make([]ident.Tags, 0, numTagSets) + for i := 0; i < numTagSets; i++ { + tags := ident.NewTags() + for j := 0; j < numTags; j++ { + tag := ident.Tag{ + Name: ident.StringID(fmt.Sprintf("series.%d.tag.%d", i, j)), + Value: ident.StringID(fmt.Sprintf("series.%d.tag-value.%d", i, j)), + } + tags.Append(tag) + } + tagSets = append(tagSets, tags) + } + + log.Info("writing data") + + now := setup.getNowFn() + blockStart := now.Add(-3 * blockSize) + + // create new commit log + commitLog, err := commitlog.NewCommitLog(commitLogOpts) + require.NoError(t, err) + require.NoError(t, commitLog.Open()) + + // NB(r): Write points using no up front series metadata or point + // generation so that the memory usage is constant during the write phase + ctx := context.NewContext() + defer ctx.Close() + shardSet := setup.shardSet + idPrefix := "test.id.test.id.test.id.test.id.test.id.test.id.test.id.test.id" + idPrefixBytes := []byte(idPrefix) + checkedBytes := checked.NewBytes(nil, nil) + seriesID := ident.BinaryID(checkedBytes) + numBytes := make([]byte, 8) + numHexBytes := make([]byte, hex.EncodedLen(len(numBytes))) + for i := 0; i < numPoints; i++ { + for j := 0; j < numSeries; j++ { + // Write the ID prefix + checkedBytes.Resize(0) + checkedBytes.AppendAll(idPrefixBytes) + + // Write out the binary representation then hex encode the + // that into the ID to give it a unique ID for this series number + binary.LittleEndian.PutUint64(numBytes, uint64(j)) + hex.Encode(numHexBytes, numBytes) + checkedBytes.AppendAll(numHexBytes) + + // Use the tag sets appropriate for this series number + seriesTags := tagSets[j%len(tagSets)] + + series := ts.Series{ + Namespace: ns.ID(), + Shard: shardSet.Lookup(seriesID), + ID: seriesID, + Tags: seriesTags, + UniqueIndex: uint64(j), + } + dp := ts.Datapoint{ + Timestamp: blockStart.Add(time.Duration(i) * step), + Value: rand.Float64(), + } + require.NoError(t, commitLog.Write(ctx, series, dp, xtime.Second, nil)) + } + } + + // ensure writes finished + require.NoError(t, commitLog.Close()) + + log.Info("finished writing data") + + // emit how big commit logs are + commitLogsDirPath := fs.CommitLogsDirPath(commitLogOpts.FilesystemOptions().FilePathPrefix()) + files, err := ioutil.ReadDir(commitLogsDirPath) + require.NoError(t, err) + + log.Info("test wrote commit logs", zap.Int("numFiles", len(files))) + for _, file := range files { + log.Info("test wrote commit logs", + zap.String("file", file.Name()), + zap.String("size", datasize.ByteSize(file.Size()).HR())) + } + + // Setup bootstrapper after writing data so filesystem inspection can find it. + setupCommitLogBootstrapperWithFSInspection(t, setup, commitLogOpts) + + // restore now time so measurements take effect + setup.storageOpts = setup.storageOpts.SetClockOptions(clock.NewOptions()) + + // Start the server with filesystem bootstrapper + require.NoError(t, setup.startServer()) + log.Debug("server is now up") + + // Stop the server + defer func() { + require.NoError(t, setup.stopServer()) + log.Debug("server is now down") + }() +} diff --git a/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go b/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go index d11545fb88..38ab860c80 100644 --- a/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go +++ b/src/dbnode/integration/fs_commitlog_mixed_mode_read_write_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/namespace" persistfs "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" @@ -35,14 +36,13 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" bcl "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/commitlog" "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper/fs" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" xtime "github.com/m3db/m3/src/x/time" - "github.com/stretchr/testify/require" "github.com/m3db/m3/src/dbnode/testdata/prototest" + "github.com/stretchr/testify/require" ) type annotationGenerator interface { @@ -87,6 +87,8 @@ func testFsCommitLogMixedModeReadWrite(t *testing.T, setTestOpts setTestOptions, log := setup.storageOpts.InstrumentOptions().Logger() log.Info("commit log & fileset files, write, read, and merge bootstrap test") + filePathPrefix := setup.storageOpts.CommitLogOptions().FilesystemOptions().FilePathPrefix() + // setting time to 2017/02/13 15:30:10 fakeStart := time.Date(2017, time.February, 13, 15, 30, 10, 0, time.Local) blkStart15 := fakeStart.Truncate(ns1BlockSize) @@ -137,7 +139,7 @@ func testFsCommitLogMixedModeReadWrite(t *testing.T, setTestOpts setTestOptions, expectedFlushedData := datapoints.toSeriesMap(ns1BlockSize) delete(expectedFlushedData, xtime.ToUnixNano(blkStart18)) waitTimeout := 5 * time.Minute - filePathPrefix := setup.storageOpts.CommitLogOptions().FilesystemOptions().FilePathPrefix() + log.Info("waiting till expected fileset files have been written") require.NoError(t, waitUntilDataFilesFlushed(filePathPrefix, setup.shardSet, nsID, expectedFlushedData, waitTimeout)) log.Info("expected fileset files have been written") diff --git a/src/dbnode/integration/options.go b/src/dbnode/integration/options.go index 7bc351c875..6074150593 100644 --- a/src/dbnode/integration/options.go +++ b/src/dbnode/integration/options.go @@ -24,9 +24,9 @@ import ( "testing" "time" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/block" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/topology" "github.com/stretchr/testify/require" @@ -256,7 +256,7 @@ type testOptions interface { FilePathPrefix() string // SetProtoEncoding turns on proto encoder. - SetProtoEncoding (value bool) testOptions + SetProtoEncoding(value bool) testOptions // ProtoEncoding returns whether proto encoder is turned on. ProtoEncoding() bool @@ -267,6 +267,12 @@ type testOptions interface { // AssertTestDataEqual returns a comparator to compare two byte arrays. AssertTestDataEqual() assertTestDataEqual + + // SetNowFn will set the now fn. + SetNowFn(value func() time.Time) testOptions + + // NowFn returns the now fn. + NowFn() func() time.Time } type options struct { @@ -298,6 +304,7 @@ type options struct { writeNewSeriesAsync bool protoEncoding bool assertEqual assertTestDataEqual + nowFn func() time.Time } func newTestOptions(t *testing.T) testOptions { @@ -613,3 +620,13 @@ func (o *options) SetAssertTestDataEqual(value assertTestDataEqual) testOptions func (o *options) AssertTestDataEqual() assertTestDataEqual { return o.assertEqual } + +func (o *options) SetNowFn(value func() time.Time) testOptions { + opts := *o + opts.nowFn = value + return &opts +} + +func (o *options) NowFn() func() time.Time { + return o.nowFn +} diff --git a/src/dbnode/integration/setup.go b/src/dbnode/integration/setup.go index d9f622bb04..b7ba41462f 100644 --- a/src/dbnode/integration/setup.go +++ b/src/dbnode/integration/setup.go @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/dbnode/generated/thrift/rpc" "github.com/m3db/m3/src/dbnode/integration/fake" "github.com/m3db/m3/src/dbnode/integration/generate" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/sharding" @@ -47,7 +48,6 @@ import ( "github.com/m3db/m3/src/dbnode/storage/bootstrap/bootstrapper" "github.com/m3db/m3/src/dbnode/storage/cluster" "github.com/m3db/m3/src/dbnode/storage/index" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/storage/series" "github.com/m3db/m3/src/dbnode/testdata/prototest" "github.com/m3db/m3/src/dbnode/topology" @@ -82,7 +82,7 @@ var ( testProtoEqual = func(expect, actual []byte) bool { return prototest.ProtoEqual(testSchema, expect, actual) } - testProtoIter = prototest.NewProtoMessageIterator(testProtoMessages) + testProtoIter = prototest.NewProtoMessageIterator(testProtoMessages) ) // nowSetterFn is the function that sets the current time @@ -293,8 +293,14 @@ func newTestSetup(t *testing.T, opts testOptions, fsOpts fs.Options) (*testSetup now = t lock.Unlock() } - storageOpts = storageOpts.SetClockOptions( - storageOpts.ClockOptions().SetNowFn(getNowFn)) + if overrideTimeNow := opts.NowFn(); overrideTimeNow != nil { + // Allow overriding the frozen time + storageOpts = storageOpts.SetClockOptions( + storageOpts.ClockOptions().SetNowFn(overrideTimeNow)) + } else { + storageOpts = storageOpts.SetClockOptions( + storageOpts.ClockOptions().SetNowFn(getNowFn)) + } // Set up file path prefix idx := atomic.AddUint64(&created, 1) - 1 diff --git a/src/dbnode/persist/fs/commitlog/commit_log_test.go b/src/dbnode/persist/fs/commitlog/commit_log_test.go index 3ae894195c..3520659fe8 100644 --- a/src/dbnode/persist/fs/commitlog/commit_log_test.go +++ b/src/dbnode/persist/fs/commitlog/commit_log_test.go @@ -46,6 +46,22 @@ import ( "github.com/uber-go/tally" ) +// readAllSeriesPredicateTest is the same as ReadAllSeriesPredicate except +// it asserts that the ID and the namespace are not nil. +func readAllSeriesPredicateTest() SeriesFilterPredicate { + return func(id ident.ID, namespace ident.ID) bool { + if id == nil { + panic(fmt.Sprintf("series ID passed to series predicate is nil")) + } + + if namespace == nil { + panic(fmt.Sprintf("namespace ID passed to series predicate is nil")) + } + + return true + } +} + type overrides struct { clock *mclock.Mock flushInterval *time.Duration @@ -286,7 +302,7 @@ func assertCommitLogWritesByIterating(t *testing.T, l *commitLog, writes []testW iterOpts := IteratorOpts{ CommitLogOptions: l.opts, FileFilterPredicate: ReadAllPredicate(), - SeriesFilterPredicate: ReadAllSeriesPredicate(), + SeriesFilterPredicate: readAllSeriesPredicateTest(), } iter, corruptFiles, err := NewIterator(iterOpts) require.NoError(t, err) @@ -413,7 +429,7 @@ func TestReadCommitLogMissingMetadata(t *testing.T) { iterOpts := IteratorOpts{ CommitLogOptions: opts, FileFilterPredicate: ReadAllPredicate(), - SeriesFilterPredicate: ReadAllSeriesPredicate(), + SeriesFilterPredicate: readAllSeriesPredicateTest(), } iter, corruptFiles, err := NewIterator(iterOpts) require.NoError(t, err) @@ -461,7 +477,7 @@ func TestCommitLogReaderIsNotReusable(t *testing.T) { require.Equal(t, 2, len(files)) // Assert commitlog cannot be opened more than once - reader := newCommitLogReader(opts, ReadAllSeriesPredicate()) + reader := newCommitLogReader(opts, readAllSeriesPredicateTest()) _, err = reader.Open(files[0]) require.NoError(t, err) reader.Close() @@ -519,7 +535,7 @@ func TestCommitLogIteratorUsesPredicateFilterForNonCorruptFiles(t *testing.T) { iterOpts := IteratorOpts{ CommitLogOptions: opts, FileFilterPredicate: commitLogPredicate, - SeriesFilterPredicate: ReadAllSeriesPredicate(), + SeriesFilterPredicate: readAllSeriesPredicateTest(), } iter, corruptFiles, err := NewIterator(iterOpts) require.NoError(t, err) @@ -563,7 +579,7 @@ func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) { iterOpts := IteratorOpts{ CommitLogOptions: opts, FileFilterPredicate: ReadAllPredicate(), - SeriesFilterPredicate: ReadAllSeriesPredicate(), + SeriesFilterPredicate: readAllSeriesPredicateTest(), } iter, corruptFiles, err := NewIterator(iterOpts) require.NoError(t, err) @@ -580,7 +596,7 @@ func TestCommitLogIteratorUsesPredicateFilterForCorruptFiles(t *testing.T) { iterOpts = IteratorOpts{ CommitLogOptions: opts, FileFilterPredicate: ignoreCorruptPredicate, - SeriesFilterPredicate: ReadAllSeriesPredicate(), + SeriesFilterPredicate: readAllSeriesPredicateTest(), } iter, corruptFiles, err = NewIterator(iterOpts) require.NoError(t, err) diff --git a/src/dbnode/persist/fs/commitlog/reader.go b/src/dbnode/persist/fs/commitlog/reader.go index db101eb9ac..c6ef49e24e 100644 --- a/src/dbnode/persist/fs/commitlog/reader.go +++ b/src/dbnode/persist/fs/commitlog/reader.go @@ -21,22 +21,20 @@ package commitlog import ( - "context" + "bytes" "encoding/binary" "errors" "io" "os" - "sync" - "sync/atomic" "time" "github.com/m3db/m3/src/dbnode/persist/fs/msgpack" "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/ts" - "github.com/m3db/m3/src/x/serialize" "github.com/m3db/m3/src/x/checked" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/serialize" xtime "github.com/m3db/m3/src/x/time" ) @@ -78,89 +76,43 @@ type commitLogReader interface { Close() error } -type readResponse struct { - series ts.Series - datapoint ts.Datapoint - unit xtime.Unit - annotation ts.Annotation - resultErr error -} +type reader struct { + opts Options -type decoderArg struct { - bytes []byte - err error - decodeRemainingToken msgpack.DecodeLogEntryRemainingToken - uniqueIndex uint64 - offset int - bufPool chan []byte -} + seriesPredicate SeriesFilterPredicate -type readerMetadata struct { - sync.RWMutex - numBlockedOrFinishedDecoders int64 -} + logEntryBytes []byte + tagDecoder serialize.TagDecoder + tagDecoderCheckedBytes checked.Bytes + checkedBytesPool pool.CheckedBytesPool + chunkReader *chunkReader + infoDecoder *msgpack.Decoder + infoDecoderStream msgpack.ByteDecoderStream + hasBeenOpened bool -type reader struct { - opts Options - numConc int64 - checkedBytesPool pool.CheckedBytesPool - chunkReader *chunkReader - infoDecoder *msgpack.Decoder - infoDecoderStream msgpack.ByteDecoderStream - decoderQueues []chan decoderArg - decoderBufPools []chan []byte - outChan chan readResponse - cancelCtx context.Context - cancelFunc context.CancelFunc - shutdownCh chan error - metadata readerMetadata - nextIndex int64 - hasBeenOpened bool - bgWorkersInitialized int64 - seriesPredicate SeriesFilterPredicate + metadataLookup map[uint64]seriesMetadata + namespacesRead []ident.ID } func newCommitLogReader(opts Options, seriesPredicate SeriesFilterPredicate) commitLogReader { - decodingOpts := opts.FilesystemOptions().DecodingOptions() - cancelCtx, cancelFunc := context.WithCancel(context.Background()) - - numConc := opts.ReadConcurrency() - decoderQueues := make([]chan decoderArg, 0, numConc) - decoderBufs := make([]chan []byte, 0, numConc) - for i := 0; i < numConc; i++ { - decoderQueues = append(decoderQueues, make(chan decoderArg, decoderInBufChanSize)) - - chanBufs := make(chan []byte, decoderInBufChanSize+1) - for i := 0; i < decoderInBufChanSize+1; i++ { - // Bufs will be resized as needed, so its ok if our default isn't big enough - chanBufs <- make([]byte, defaultDecodeEntryBufSize) - } - decoderBufs = append(decoderBufs, chanBufs) - } - outBuf := make(chan readResponse, decoderOutBufChanSize*numConc) - - reader := &reader{ - opts: opts, - numConc: int64(numConc), - checkedBytesPool: opts.BytesPool(), - chunkReader: newChunkReader(opts.FlushSize()), - infoDecoder: msgpack.NewDecoder(decodingOpts), - infoDecoderStream: msgpack.NewByteDecoderStream(nil), - decoderQueues: decoderQueues, - decoderBufPools: decoderBufs, - outChan: outBuf, - cancelCtx: cancelCtx, - cancelFunc: cancelFunc, - shutdownCh: make(chan error), - metadata: readerMetadata{}, - nextIndex: 0, - seriesPredicate: seriesPredicate, + tagDecoderCheckedBytes := checked.NewBytes(nil, nil) + tagDecoderCheckedBytes.IncRef() + return &reader{ + opts: opts, + seriesPredicate: seriesPredicate, + logEntryBytes: make([]byte, 0, opts.FlushSize()), + metadataLookup: make(map[uint64]seriesMetadata), + tagDecoder: opts.FilesystemOptions().TagDecoderPool().Get(), + tagDecoderCheckedBytes: tagDecoderCheckedBytes, + checkedBytesPool: opts.BytesPool(), + chunkReader: newChunkReader(opts.FlushSize()), + infoDecoder: msgpack.NewDecoder(opts.FilesystemOptions().DecodingOptions()), + infoDecoderStream: msgpack.NewByteDecoderStream(nil), } - return reader } func (r *reader) Open(filePath string) (int64, error) { - // Commitlog reader does not currently support being reused + // Commitlog reader does not currently support being reused. if r.hasBeenOpened { return 0, errCommitLogReaderIsNotReusable } @@ -178,351 +130,173 @@ func (r *reader) Open(filePath string) (int64, error) { return 0, err } index := info.Index - return index, nil } -// Read guarantees that the datapoints it returns will be in the same order as they are on disk -// for a given series, but they will not be in the same order they are on disk across series. -// I.E, if the commit log looked like this (letters are series and numbers are writes): -// A1, B1, B2, A2, C1, D1, D2, A3, B3, D2 -// Then the caller is guaranteed to receive A1 before A2 and A2 before A3, and they are guaranteed -// to see B1 before B2, but they may see B1 before A1 and D2 before B3. +func (r *reader) readInfo() (schema.LogInfo, error) { + err := r.readLogEntry() + if err != nil { + return emptyLogInfo, err + } + r.infoDecoderStream.Reset(r.logEntryBytes) + r.infoDecoder.Reset(r.infoDecoderStream) + return r.infoDecoder.DecodeLogInfo() +} + +// Read reads the next log entry in order. func (r *reader) Read() ( series ts.Series, datapoint ts.Datapoint, unit xtime.Unit, annotation ts.Annotation, - resultErr error, + err error, ) { - if r.nextIndex == 0 { - err := r.startBackgroundWorkers() - if err != nil { - return ts.Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), err - } - } - rr, ok := <-r.outChan - if !ok { - return ts.Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), io.EOF - } - r.nextIndex++ - return rr.series, rr.datapoint, rr.unit, rr.annotation, rr.resultErr -} - -func (r *reader) startBackgroundWorkers() error { - // Make sure background workers are never setup more than once - set := atomic.CompareAndSwapInt64(&r.bgWorkersInitialized, 0, 1) - if !set { - return errCommitLogReaderMultipleReadloops - } - - // Start background worker goroutines - go r.readLoop() - for _, decoderQueue := range r.decoderQueues { - localDecoderQueue := decoderQueue - go r.decoderLoop(localDecoderQueue, r.outChan) - } - - return nil -} - -func (r *reader) readLoop() { - defer func() { - for _, decoderQueue := range r.decoderQueues { - close(decoderQueue) - } - }() - - decodingOpts := r.opts.FilesystemOptions().DecodingOptions() - decoder := msgpack.NewDecoder(decodingOpts) - decoderStream := msgpack.NewByteDecoderStream(nil) - - reusedBytes := make([]byte, 0, r.opts.FlushSize()) - - for { - select { - case <-r.cancelCtx.Done(): - return - default: - data, err := r.readChunk(reusedBytes) - if err != nil { - if err == io.EOF { - return - } - - r.decoderQueues[0] <- decoderArg{ - bytes: nil, - err: err, - } - continue - } - - decoderStream.Reset(data) - decoder.Reset(decoderStream) - decodeRemainingToken, uniqueIndex, err := decoder.DecodeLogEntryUniqueIndex() - - // Grab a buffer from a pool specific to the decoder loop we're gonna send it to - shardedIdx := uniqueIndex % uint64(r.numConc) - bufPool := r.decoderBufPools[shardedIdx] - buf := <-bufPool - - // Resize the buffer as necessary - bufCap := len(buf) - dataLen := len(data) - if bufCap < dataLen { - diff := dataLen - bufCap - for i := 0; i < diff; i++ { - buf = append(buf, 0) - } - } - buf = buf[:dataLen] - - // Copy into the buffer - copy(buf, data) - - // Distribute work by the uniqueIndex so that each decoder loop is receiving - // all datapoints for a given series within relative order. - r.decoderQueues[shardedIdx] <- decoderArg{ - bytes: buf, - err: err, - decodeRemainingToken: decodeRemainingToken, - uniqueIndex: uniqueIndex, - offset: decoderStream.Offset(), - bufPool: bufPool, - } - } - } -} - -func (r *reader) decoderLoop(inBuf <-chan decoderArg, outBuf chan<- readResponse) { var ( - decodingOpts = r.opts.FilesystemOptions().DecodingOptions() - decoder = msgpack.NewDecoder(decodingOpts) - decoderStream = msgpack.NewByteDecoderStream(nil) - metadataDecoder = msgpack.NewDecoder(decodingOpts) - metadataDecoderStream = msgpack.NewByteDecoderStream(nil) - metadataLookup = make(map[uint64]seriesMetadata) - tagDecoder = r.opts.FilesystemOptions().TagDecoderPool().Get() - tagDecoderCheckedBytes = checked.NewBytes(nil, nil) + entry schema.LogEntry + metadata seriesMetadata ) - tagDecoderCheckedBytes.IncRef() - defer func() { - tagDecoderCheckedBytes.DecRef() - tagDecoderCheckedBytes.Finalize() - tagDecoder.Close() - }() - - for arg := range inBuf { - response := readResponse{} - // If there is a pre-existing error, just pipe it through - if arg.err != nil { - r.handleDecoderLoopIterationEnd(arg, outBuf, response, arg.err) - continue + for !metadata.passedPredicate { + err = r.readLogEntry() + if err != nil { + return ts.Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), err } - // Decode the log entry - decoderStream.Reset(arg.bytes[arg.offset:]) - decoder.Reset(decoderStream) - entry, err := decoder.DecodeLogEntryRemaining(arg.decodeRemainingToken, arg.uniqueIndex) + entry, err = msgpack.DecodeLogEntryFast(r.logEntryBytes) if err != nil { - r.handleDecoderLoopIterationEnd(arg, outBuf, response, err) - continue + return ts.Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), err } - // If the log entry has associated metadata, decode that as well - if len(entry.Metadata) != 0 { - err := r.decodeAndHandleMetadata(metadataLookup, metadataDecoder, metadataDecoderStream, - tagDecoder, tagDecoderCheckedBytes, entry) - if err != nil { - r.handleDecoderLoopIterationEnd(arg, outBuf, response, err) - continue - } + metadata, err = r.seriesMetadataForEntry(entry) + if err != nil { + return ts.Series{}, ts.Datapoint{}, xtime.Unit(0), ts.Annotation(nil), err } + } - metadata, hasMetadata := metadataLookup[entry.Index] - if !hasMetadata { - // In this case we know the commit log is corrupt because the commit log writer guarantees - // that the first entry for a series in the commit log includes its metadata. In addition, - // even though we are performing parallel decoding, the work is distributed to the workers - // based on the series unique index which means that all commit log entries for a given - // series are decoded in-order by a single decoder loop. - r.handleDecoderLoopIterationEnd(arg, outBuf, response, errCommitLogReaderMissingMetadata) - continue - } + series = metadata.Series - if !metadata.passedPredicate { - // Pass nil for outBuf because we don't want to send a readResponse along since this - // was just a series that the caller didn't want us to read. - r.handleDecoderLoopIterationEnd(arg, nil, readResponse{}, nil) - continue - } + datapoint = ts.Datapoint{ + Timestamp: time.Unix(0, entry.Timestamp), + Value: entry.Value, + } - response.series = metadata.Series + unit = xtime.Unit(entry.Unit) - response.datapoint = ts.Datapoint{ - Timestamp: time.Unix(0, entry.Timestamp), - Value: entry.Value, - } - response.unit = xtime.Unit(byte(entry.Unit)) + if len(entry.Annotation) > 0 { // Copy annotation to prevent reference to pooled byte slice - if len(entry.Annotation) > 0 { - response.annotation = append([]byte(nil), entry.Annotation...) - } - r.handleDecoderLoopIterationEnd(arg, outBuf, response, nil) + annotation = append(ts.Annotation(nil), ts.Annotation(entry.Annotation)...) } - r.metadata.Lock() - r.metadata.numBlockedOrFinishedDecoders++ - // If all of the decoders are either finished or blocked then we need to free - // any pending waiters. This also guarantees that the last decoderLoop to - // finish will free up any pending waiters (and by then any still-pending - // metadata is definitely missing from the commitlog) - if r.metadata.numBlockedOrFinishedDecoders >= r.numConc { - close(outBuf) - } - r.metadata.Unlock() + return series, datapoint, unit, annotation, nil } -func (r *reader) handleDecoderLoopIterationEnd(arg decoderArg, outBuf chan<- readResponse, response readResponse, err error) { - if arg.bytes != nil { - arg.bufPool <- arg.bytes +func (r *reader) readLogEntry() error { + // Read size of message + size, err := binary.ReadUvarint(r.chunkReader) + if err != nil { + return err } - if outBuf != nil { - response.resultErr = err - outBuf <- response + + // Extend buffer as necessary + r.logEntryBytes = resizeBufferOrGrowIfNeeded(r.logEntryBytes, int(size)) + + // Read message + if _, err := io.ReadFull(r.chunkReader, r.logEntryBytes); err != nil { + return err } + + return nil } -func (r *reader) decodeAndHandleMetadata( - metadataLookup map[uint64]seriesMetadata, - metadataDecoder *msgpack.Decoder, - metadataDecoderStream msgpack.ByteDecoderStream, - tagDecoder serialize.TagDecoder, - tagDecoderCheckedBytes checked.Bytes, +func (r *reader) seriesMetadataForEntry( entry schema.LogEntry, -) error { - metadataDecoderStream.Reset(entry.Metadata) - metadataDecoder.Reset(metadataDecoderStream) - decoded, err := metadataDecoder.DecodeLogMetadata() - if err != nil { - return err +) (seriesMetadata, error) { + metadata, ok := r.metadataLookup[entry.Index] + if ok { + // If the metadata already exists, we can skip this step. + return metadata, nil } - _, ok := metadataLookup[entry.Index] - if ok { - // If the metadata already exists, we can skip this step - return nil + if len(entry.Metadata) == 0 { + // Expected metadata but not encoded. + return seriesMetadata{}, errCommitLogReaderMissingMetadata + } + + decoded, err := msgpack.DecodeLogMetadataFast(entry.Metadata) + if err != nil { + return seriesMetadata{}, err } id := r.checkedBytesPool.Get(len(decoded.ID)) id.IncRef() id.AppendAll(decoded.ID) - namespace := r.checkedBytesPool.Get(len(decoded.Namespace)) - namespace.IncRef() - namespace.AppendAll(decoded.Namespace) + // Find or allocate the namespace ID. + var namespaceID ident.ID + for _, ns := range r.namespacesRead { + if bytes.Equal(ns.Bytes(), decoded.Namespace) { + namespaceID = ns + break + } + } + if namespaceID == nil { + // Take a copy and keep around to reuse later. + namespaceID = ident.BytesID(append([]byte(nil), decoded.Namespace...)) + r.namespacesRead = append(r.namespacesRead, namespaceID) + } var ( + idPool = r.opts.IdentifierPool() tags ident.Tags tagBytesLen = len(decoded.EncodedTags) ) if tagBytesLen != 0 { - tagDecoderCheckedBytes.Reset(decoded.EncodedTags) - tagDecoder.Reset(tagDecoderCheckedBytes) + r.tagDecoderCheckedBytes.Reset(decoded.EncodedTags) + r.tagDecoder.Reset(r.tagDecoderCheckedBytes) - idPool := r.opts.IdentifierPool() tags = idPool.Tags() - for tagDecoder.Next() { - curr := tagDecoder.Current() + for r.tagDecoder.Next() { + curr := r.tagDecoder.Current() tags.Append(idPool.CloneTag(curr)) } - err = tagDecoder.Err() + err = r.tagDecoder.Err() if err != nil { - return err + return seriesMetadata{}, err } } - metadata := ts.Series{ - UniqueIndex: entry.Index, - ID: ident.BinaryID(id), - Namespace: ident.BinaryID(namespace), - Shard: decoded.Shard, - Tags: tags, + seriesID := idPool.BinaryID(id) + metadata = seriesMetadata{ + Series: ts.Series{ + UniqueIndex: entry.Index, + ID: seriesID, + Namespace: namespaceID, + Shard: decoded.Shard, + Tags: tags, + }, + passedPredicate: r.seriesPredicate(seriesID, namespaceID), } - metadataLookup[entry.Index] = seriesMetadata{ - Series: metadata, - passedPredicate: r.seriesPredicate(metadata.ID, metadata.Namespace), - } + r.metadataLookup[entry.Index] = metadata - namespace.DecRef() id.DecRef() - return nil -} -func (r *reader) readChunk(buf []byte) ([]byte, error) { - // Read size of message - size, err := binary.ReadUvarint(r.chunkReader) - if err != nil { - return nil, err - } - - // Extend buffer as necessary - if len(buf) < int(size) { - diff := int(size) - len(buf) - for i := 0; i < diff; i++ { - buf = append(buf, 0) - } - } - - // Size target buffer for reading - buf = buf[:size] - - // Read message - if _, err := r.chunkReader.Read(buf); err != nil { - return nil, err - } - - return buf, nil -} - -func (r *reader) readInfo() (schema.LogInfo, error) { - data, err := r.readChunk([]byte{}) - if err != nil { - return emptyLogInfo, err - } - r.infoDecoderStream.Reset(data) - r.infoDecoder.Reset(r.infoDecoderStream) - logInfo, err := r.infoDecoder.DecodeLogInfo() - return logInfo, err + return metadata, nil } func (r *reader) Close() error { - // Background goroutines were never started, safe to close immediately. - if r.nextIndex == 0 { - return r.close() - } + return r.chunkReader.fd.Close() +} - // Shutdown the readLoop goroutine which will shut down the decoderLoops - // and close the fd - r.cancelFunc() - // Drain any unread data from the outBuffers to free any decoderLoops curently - // in a blocking write - for { - _, ok := <-r.outChan - r.nextIndex++ - if !ok { - break - } +func resizeBufferOrGrowIfNeeded(buf []byte, length int) []byte { + if cap(buf) >= length { + return buf[:length] } - return r.close() -} - -func (r *reader) close() error { - if r.chunkReader.fd == nil { - return nil + // If double is less than length requested, return that. + var newCap int + for newCap = 2 * cap(buf); newCap < length; newCap *= 2 { + // Double it again. } - return r.chunkReader.fd.Close() + return make([]byte, length, newCap) } diff --git a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go index 1e0e904974..1ddf631c37 100644 --- a/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go +++ b/src/dbnode/storage/bootstrap/bootstrapper/commitlog/source_prop_test.go @@ -36,11 +36,11 @@ import ( "github.com/m3db/m3/src/dbnode/digest" "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/encoding/m3tsz" + "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" "github.com/m3db/m3/src/dbnode/storage/bootstrap/result" - "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/topology" tu "github.com/m3db/m3/src/dbnode/topology/testutil" "github.com/m3db/m3/src/dbnode/ts"