diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 23622ab484..9fd1f353f2 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -89,12 +89,12 @@ const ( type blockRetriever struct { sync.RWMutex - opts BlockRetrieverOptions - fsOpts Options - logger *zap.Logger - queryLimits limits.QueryLimits - bytesReadLimit limits.LookbackLimit - seriesReadCount tally.Counter + opts BlockRetrieverOptions + fsOpts Options + logger *zap.Logger + queryLimits limits.QueryLimits + bytesReadLimit limits.LookbackLimit + seriesBloomFilterMisses tally.Counter newSeekerMgrFn newSeekerMgrFn @@ -126,18 +126,18 @@ func NewBlockRetriever( scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever") return &blockRetriever{ - opts: opts, - fsOpts: fsOpts, - logger: fsOpts.InstrumentOptions().Logger(), - queryLimits: opts.QueryLimits(), - bytesReadLimit: opts.QueryLimits().BytesReadLimit(), - seriesReadCount: scope.Counter("series-read"), - newSeekerMgrFn: NewSeekerManager, - reqPool: opts.RetrieveRequestPool(), - bytesPool: opts.BytesPool(), - idPool: opts.IdentifierPool(), - status: blockRetrieverNotOpen, - notifyFetch: make(chan struct{}, 1), + opts: opts, + fsOpts: fsOpts, + logger: fsOpts.InstrumentOptions().Logger(), + queryLimits: opts.QueryLimits(), + bytesReadLimit: opts.QueryLimits().BytesReadLimit(), + seriesBloomFilterMisses: scope.Counter("series-bloom-filter-misses"), + newSeekerMgrFn: NewSeekerManager, + reqPool: opts.RetrieveRequestPool(), + bytesPool: opts.BytesPool(), + idPool: opts.IdentifierPool(), + status: blockRetrieverNotOpen, + notifyFetch: make(chan struct{}, 1), // We just close this channel when the fetchLoops should shutdown, so no // buffering is required fetchLoopsShouldShutdownCh: make(chan struct{}), @@ -560,6 +560,33 @@ func (r *blockRetriever) fetchBatch( } } +func (r *blockRetriever) seriesPresentInBloomFilter( + id ident.ID, + shard uint32, + startTime time.Time, +) (bool, error) { + // Capture variable and RLock() because this slice can be modified in the + // Open() method + r.RLock() + // This should never happen unless caller tries to use Stream() before Open() + if r.seekerMgr == nil { + r.RUnlock() + return false, errNoSeekerMgr + } + r.RUnlock() + + idExists, err := r.seekerMgr.Test(id, shard, startTime) + if err != nil { + return false, err + } + + if !idExists { + r.seriesBloomFilterMisses.Inc(1) + } + + return idExists, nil +} + // streamRequest returns a bool indicating if the ID was found, and any errors. func (r *blockRetriever) streamRequest( ctx context.Context, @@ -568,11 +595,10 @@ func (r *blockRetriever) streamRequest( id ident.ID, startTime time.Time, nsCtx namespace.Context, -) (bool, error) { +) error { req.resultWg.Add(1) - r.seriesReadCount.Inc(1) if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil { - return false, err + return err } req.shard = shard @@ -592,29 +618,9 @@ func (r *blockRetriever) streamRequest( // Ensure to finalize at the end of request ctx.RegisterFinalizer(req) - // Capture variable and RLock() because this slice can be modified in the - // Open() method - r.RLock() - // This should never happen unless caller tries to use Stream() before Open() - if r.seekerMgr == nil { - r.RUnlock() - return false, errNoSeekerMgr - } - r.RUnlock() - - idExists, err := r.seekerMgr.Test(id, shard, startTime) - if err != nil { - return false, err - } - - // If the ID is not in the seeker's bloom filter, then it's definitely not on - // disk and we can return immediately. - if !idExists { - return false, nil - } reqs, err := r.shardRequests(shard) if err != nil { - return false, err + return err } reqs.Lock() @@ -633,7 +639,7 @@ func (r *blockRetriever) streamRequest( // the data. This means that even though we're returning nil for error // here, the caller may still encounter an error when they attempt to // read the data. - return true, nil + return nil } func (r *blockRetriever) Stream( @@ -644,6 +650,16 @@ func (r *blockRetriever) Stream( onRetrieve block.OnRetrieveBlock, nsCtx namespace.Context, ) (xio.BlockReader, error) { + found, err := r.seriesPresentInBloomFilter(id, shard, startTime) + if err != nil { + return xio.EmptyBlockReader, err + } + // If the ID is not in the seeker's bloom filter, then it's definitely not on + // disk and we can return immediately. + if !found { + return xio.EmptyBlockReader, nil + } + req := r.reqPool.Get() req.onRetrieve = onRetrieve req.streamReqType = streamDataReq @@ -655,18 +671,12 @@ func (r *blockRetriever) Stream( } } - found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) + err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) if err != nil { req.resultWg.Done() return xio.EmptyBlockReader, err } - if !found { - req.onRetrieved(ts.Segment{}, namespace.Context{}) - req.success = true - req.onDone() - } - // The request may not have completed yet, but it has an internal // waitgroup which the caller will have to wait for before retrieving // the data. This means that even though we're returning nil for error @@ -683,22 +693,26 @@ func (r *blockRetriever) StreamWideEntry( filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { + found, err := r.seriesPresentInBloomFilter(id, shard, startTime) + if err != nil { + return block.EmptyStreamedWideEntry, err + } + // If the ID is not in the seeker's bloom filter, then it's definitely not on + // disk and we can return immediately. + if !found { + return block.EmptyStreamedWideEntry, nil + } + req := r.reqPool.Get() req.streamReqType = streamWideEntryReq req.wideFilter = filter - found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) + err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx) if err != nil { req.resultWg.Done() return block.EmptyStreamedWideEntry, err } - if !found { - req.wideEntry = xio.WideEntry{} - req.success = true - req.onDone() - } - // The request may not have completed yet, but it has an internal // waitgroup which the caller will have to wait for before retrieving // the data. This means that even though we're returning nil for error diff --git a/src/dbnode/persist/fs/retriever_test.go b/src/dbnode/persist/fs/retriever_test.go index 54d57c0947..5e7178050a 100644 --- a/src/dbnode/persist/fs/retriever_test.go +++ b/src/dbnode/persist/fs/retriever_test.go @@ -127,7 +127,7 @@ type streamResult struct { shard uint32 id string blockStart time.Time - stream xio.SegmentReader + stream xio.BlockReader } // TestBlockRetrieverHighConcurrentSeeks tests the retriever with high @@ -395,6 +395,14 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices } for _, r := range results { + compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)] + + // If the stream is empty, assert that the expected result is also nil + if r.stream.IsEmpty() { + require.Nil(t, compare.Head) + continue + } + seg, err := r.stream.Segment() if err != nil { fmt.Printf("\nstream seg err: %v\n", err) @@ -404,7 +412,6 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices } require.NoError(t, err) - compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)] require.True( t, seg.Equal(&compare), @@ -538,6 +545,8 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices // on the retriever in the case where the requested ID does not exist. In that // case, Stream() should return an empty segment. func TestBlockRetrieverIDDoesNotExist(t *testing.T) { + scope := tally.NewTestScope("test", nil) + // Make sure reader/writer are looking at the same test directory dir, err := ioutil.TempDir("", "testdb") require.NoError(t, err) @@ -555,7 +564,7 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) { // Setup the reader opts := testBlockRetrieverOptions{ retrieverOpts: defaultTestBlockRetrieverOptions, - fsOpts: fsOpts, + fsOpts: fsOpts.SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)), shards: []uint32{shard}, } retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts) @@ -572,17 +581,18 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) { assert.NoError(t, err) closer() - // Make sure we return the correct error if the ID does not exist ctx := context.NewContext() defer ctx.Close() segmentReader, err := retriever.Stream(ctx, shard, ident.StringID("not-exists"), blockStart, nil, nsCtx) assert.NoError(t, err) - segment, err := segmentReader.Segment() - assert.NoError(t, err) - assert.Equal(t, nil, segment.Head) - assert.Equal(t, nil, segment.Tail) + assert.True(t, segmentReader.IsEmpty()) + + // Check that the bloom filter miss metric was incremented + snapshot := scope.Snapshot() + seriesRead := snapshot.Counters()["test.retriever.series-bloom-filter-misses+"] + require.Equal(t, int64(1), seriesRead.Value()) } // TestBlockRetrieverOnlyCreatesTagItersIfTagsExists verifies that the block retriever @@ -823,14 +833,12 @@ func TestLimitSeriesReadFromDisk(t *testing.T) { require.NoError(t, err) req := &retrieveRequest{} retriever := publicRetriever.(*blockRetriever) - _, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) - _, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) + err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{}) require.Error(t, err) require.Contains(t, err.Error(), "query aborted due to limit") snapshot := scope.Snapshot() - seriesRead := snapshot.Counters()["test.retriever.series-read+"] - require.Equal(t, int64(2), seriesRead.Value()) seriesLimit := snapshot.Counters()["test.query-limit.exceeded+limit=disk-series-read"] require.Equal(t, int64(1), seriesLimit.Value()) }