From e01a166cc8fbd6cbe16139fdf27fef1f80c31845 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Tue, 19 Jan 2021 17:09:21 -0500 Subject: [PATCH 1/3] [m3db] Check bloom filter before stream request allocation --- src/dbnode/persist/fs/retriever.go | 129 ++++++++++++++---------- src/dbnode/persist/fs/retriever_test.go | 20 ++-- 2 files changed, 85 insertions(+), 64 deletions(-) diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 23622ab484..e9c77c74c8 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -89,12 +89,13 @@ const ( type blockRetriever struct { sync.RWMutex - opts BlockRetrieverOptions - fsOpts Options - logger *zap.Logger - queryLimits limits.QueryLimits - bytesReadLimit limits.LookbackLimit - seriesReadCount tally.Counter + opts BlockRetrieverOptions + fsOpts Options + logger *zap.Logger + queryLimits limits.QueryLimits + bytesReadLimit limits.LookbackLimit + seriesReadCount tally.Counter + seriesBloomFilterMisses tally.Counter newSeekerMgrFn newSeekerMgrFn @@ -126,18 +127,19 @@ func NewBlockRetriever( scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever") return &blockRetriever{ - opts: opts, - fsOpts: fsOpts, - logger: fsOpts.InstrumentOptions().Logger(), - queryLimits: opts.QueryLimits(), - bytesReadLimit: opts.QueryLimits().BytesReadLimit(), - seriesReadCount: scope.Counter("series-read"), - newSeekerMgrFn: NewSeekerManager, - reqPool: opts.RetrieveRequestPool(), - bytesPool: opts.BytesPool(), - idPool: opts.IdentifierPool(), - status: blockRetrieverNotOpen, - notifyFetch: make(chan struct{}, 1), + opts: opts, + fsOpts: fsOpts, + logger: fsOpts.InstrumentOptions().Logger(), + queryLimits: opts.QueryLimits(), + bytesReadLimit: opts.QueryLimits().BytesReadLimit(), + seriesReadCount: scope.Counter("series-read"), + seriesBloomFilterMisses: scope.Counter("series-bloom-filter-misses"), + newSeekerMgrFn: NewSeekerManager, + reqPool: opts.RetrieveRequestPool(), + bytesPool: opts.BytesPool(), + idPool: opts.IdentifierPool(), + status: blockRetrieverNotOpen, + notifyFetch: make(chan struct{}, 1), // We just close this channel when the fetchLoops should shutdown, so no // buffering is required fetchLoopsShouldShutdownCh: make(chan struct{}), @@ -560,6 +562,33 @@ func (r *blockRetriever) fetchBatch( } } +func (r *blockRetriever) checkSeriesInBloomFilter( + id ident.ID, + shard uint32, + startTime time.Time, +) (bool, error) { + // Capture variable and RLock() because this slice can be modified in the + // Open() method + r.RLock() + // This should never happen unless caller tries to use Stream() before Open() + if r.seekerMgr == nil { + r.RUnlock() + return false, errNoSeekerMgr + } + r.RUnlock() + + idExists, err := r.seekerMgr.Test(id, shard, startTime) + if err != nil { + return false, err + } + + if !idExists { + r.seriesBloomFilterMisses.Inc(1) + } + + return idExists, nil +} + // streamRequest returns a bool indicating if the ID was found, and any errors. func (r *blockRetriever) streamRequest( ctx context.Context, @@ -568,11 +597,11 @@ func (r *blockRetriever) streamRequest( id ident.ID, startTime time.Time, nsCtx namespace.Context, -) (bool, error) { +) error { req.resultWg.Add(1) r.seriesReadCount.Inc(1) if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { - return false, err + return err } req.shard = shard @@ -592,29 +621,9 @@ func (r *blockRetriever) streamRequest( // Ensure to finalize at the end of request ctx.RegisterFinalizer(req) - // Capture variable and RLock() because this slice can be modified in the - // Open() method - r.RLock() - // This should never happen unless caller tries to use Stream() before Open() - if r.seekerMgr == nil { - r.RUnlock() - return false, errNoSeekerMgr - } - r.RUnlock() - - idExists, err := r.seekerMgr.Test(id, shard, startTime) - if err != nil { - return false, err - } - - // If the ID is not in the seeker's bloom filter, then it's definitely not on - // disk and we can return immediately. - if !idExists { - return false, nil - } reqs, err := r.shardRequests(shard) if err != nil { - return false, err + return err } reqs.Lock() @@ -633,7 +642,7 @@ func (r *blockRetriever) streamRequest( // the data. This means that even though we're returning nil for error // here, the caller may still encounter an error when they attempt to // read the data. - return true, nil + return nil } func (r *blockRetriever) Stream( @@ -644,6 +653,16 @@ func (r *blockRetriever) Stream( onRetrieve block.OnRetrieveBlock, nsCtx namespace.Context, ) (xio.BlockReader, error) { + found, err := r.checkSeriesInBloomFilter(id, shard, startTime) + if err != nil { + return xio.EmptyBlockReader, err + } + // If the ID is not in the seeker's bloom filter, then it's definitely not on + // disk and we can return immediately. + if !found { + return xio.EmptyBlockReader, nil + } + req := r.reqPool.Get() req.onRetrieve = onRetrieve req.streamReqType = streamDataReq @@ -655,18 +674,12 @@ func (r *blockRetriever) Stream( } } - found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) + err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) if err != nil { req.resultWg.Done() return xio.EmptyBlockReader, err } - if !found { - req.onRetrieved(ts.Segment{}, namespace.Context{}) - req.success = true - req.onDone() - } - // The request may not have completed yet, but it has an internal // waitgroup which the caller will have to wait for before retrieving // the data. This means that even though we're returning nil for error @@ -683,22 +696,26 @@ func (r *blockRetriever) StreamWideEntry( filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { + found, err := r.checkSeriesInBloomFilter(id, shard, startTime) + if err != nil { + return block.EmptyStreamedWideEntry, err + } + // If the ID is not in the seeker's bloom filter, then it's definitely not on + // disk and we can return immediately. + if !found { + return block.EmptyStreamedWideEntry, nil + } + req := r.reqPool.Get() req.streamReqType = streamWideEntryReq req.wideFilter = filter - found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) + err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) if err != nil { req.resultWg.Done() return block.EmptyStreamedWideEntry, err } - if !found { - req.wideEntry = xio.WideEntry{} - req.success = true - req.onDone() - } - // The request may not have completed yet, but it has an internal // waitgroup which the caller will have to wait for before retrieving // the data. This means that even though we're returning nil for error diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 54d57c0947..7e1a371e4b 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -127,7 +127,7 @@ type streamResult struct { shard uint32 id string blockStart time.Time - stream xio.SegmentReader + stream xio.BlockReader } // TestBlockRetrieverHighConcurrentSeeks tests the retriever with high @@ -395,6 +395,14 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices } for _, r := range results { + compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)] + + // If the stream is empty, assert that the expected result is also nil + if r.stream.IsEmpty() { + require.Nil(t, compare.Head) + continue + } + seg, err := r.stream.Segment() if err != nil { fmt.Printf("\nstream seg err: %v\n", err) @@ -404,7 +412,6 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices } require.NoError(t, err) - compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)] require.True( t, seg.Equal(&compare), @@ -579,10 +586,7 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) { ident.StringID("not-exists"), blockStart, nil, nsCtx) assert.NoError(t, err) - segment, err := segmentReader.Segment() - assert.NoError(t, err) - assert.Equal(t, nil, segment.Head) - assert.Equal(t, nil, segment.Tail) + assert.True(t, segmentReader.IsEmpty()) } // TestBlockRetrieverOnlyCreatesTagItersIfTagsExists verifies that the block retriever @@ -823,8 +827,8 @@ func TestLimitSeriesReadFromDisk(t *testing.T) { require.NoError(t, err) req := &retrieveRequest{} retriever := publicRetriever.(*blockRetriever) - _, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) - _, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) require.Error(t, err) require.Contains(t, err.Error(), "query aborted due to limit") From 3bea64660f51cd247b9a38e2d237b88c46fedd96 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Tue, 19 Jan 2021 17:20:57 -0500 Subject: [PATCH 2/3] Add test assertions for bloom filer misses metric --- src/dbnode/persist/fs/retriever.go | 6 +++--- src/dbnode/persist/fs/retriever_test.go | 10 ++++++++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index e9c77c74c8..5c3d24f038 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -562,7 +562,7 @@ func (r *blockRetriever) fetchBatch( } } -func (r *blockRetriever) checkSeriesInBloomFilter( +func (r *blockRetriever) seriesPresentInBloomFilter( id ident.ID, shard uint32, startTime time.Time, @@ -653,7 +653,7 @@ func (r *blockRetriever) Stream( onRetrieve block.OnRetrieveBlock, nsCtx namespace.Context, ) (xio.BlockReader, error) { - found, err := r.checkSeriesInBloomFilter(id, shard, startTime) + found, err := r.seriesPresentInBloomFilter(id, shard, startTime) if err != nil { return xio.EmptyBlockReader, err } @@ -696,7 +696,7 @@ func (r *blockRetriever) StreamWideEntry( filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { - found, err := r.checkSeriesInBloomFilter(id, shard, startTime) + found, err := r.seriesPresentInBloomFilter(id, shard, startTime) if err != nil { return block.EmptyStreamedWideEntry, err } diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 7e1a371e4b..05440b2986 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -545,6 +545,8 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices // on the retriever in the case where the requested ID does not exist. In that // case, Stream() should return an empty segment. func TestBlockRetrieverIDDoesNotExist(t *testing.T) { + scope := tally.NewTestScope("test", nil) + // Make sure reader/writer are looking at the same test directory dir, err := ioutil.TempDir("", "testdb") require.NoError(t, err) @@ -562,7 +564,7 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) { // Setup the reader opts := testBlockRetrieverOptions{ retrieverOpts: defaultTestBlockRetrieverOptions, - fsOpts: fsOpts, + fsOpts: fsOpts.SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)), shards: []uint32{shard}, } retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts) @@ -579,7 +581,6 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) { assert.NoError(t, err) closer() - // Make sure we return the correct error if the ID does not exist ctx := context.NewContext() defer ctx.Close() segmentReader, err := retriever.Stream(ctx, shard, @@ -587,6 +588,11 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) { assert.NoError(t, err) assert.True(t, segmentReader.IsEmpty()) + + // Check that the bloom filter miss metric was incremented + snapshot := scope.Snapshot() + seriesRead := snapshot.Counters()["test.retriever.series-bloom-filter-misses+"] + require.Equal(t, int64(1), seriesRead.Value()) } // TestBlockRetrieverOnlyCreatesTagItersIfTagsExists verifies that the block retriever From 7ea6e0002c42c34fa6d17926f1c5e465ce9de54f Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Tue, 19 Jan 2021 17:35:27 -0500 Subject: [PATCH 3/3] Remove redundant series-read metric --- src/dbnode/persist/fs/retriever.go | 3 --- src/dbnode/persist/fs/retriever_test.go | 2 -- 2 files changed, 5 deletions(-) diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 5c3d24f038..9fd1f353f2 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -94,7 +94,6 @@ type blockRetriever struct { logger *zap.Logger queryLimits limits.QueryLimits bytesReadLimit limits.LookbackLimit - seriesReadCount tally.Counter seriesBloomFilterMisses tally.Counter newSeekerMgrFn newSeekerMgrFn @@ -132,7 +131,6 @@ func NewBlockRetriever( logger: fsOpts.InstrumentOptions().Logger(), queryLimits: opts.QueryLimits(), bytesReadLimit: opts.QueryLimits().BytesReadLimit(), - seriesReadCount: scope.Counter("series-read"), seriesBloomFilterMisses: scope.Counter("series-bloom-filter-misses"), newSeekerMgrFn: NewSeekerManager, reqPool: opts.RetrieveRequestPool(), @@ -599,7 +597,6 @@ func (r *blockRetriever) streamRequest( nsCtx namespace.Context, ) error { req.resultWg.Add(1) - r.seriesReadCount.Inc(1) if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { return err } diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 05440b2986..5e7178050a 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -839,8 +839,6 @@ func TestLimitSeriesReadFromDisk(t *testing.T) { require.Contains(t, err.Error(), "query aborted due to limit") snapshot := scope.Snapshot() - seriesRead := snapshot.Counters()["test.retriever.series-read+"] - require.Equal(t, int64(2), seriesRead.Value()) seriesLimit := snapshot.Counters()["test.query-limit.exceeded+limit=disk-series-read"] require.Equal(t, int64(1), seriesLimit.Value()) }