Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[m3db] Check bloom filter before stream request allocation #3103

Merged
merged 3 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 71 additions & 57 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,12 @@ const (
type blockRetriever struct {
sync.RWMutex

opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit
seriesReadCount tally.Counter
opts BlockRetrieverOptions
fsOpts Options
logger *zap.Logger
queryLimits limits.QueryLimits
bytesReadLimit limits.LookbackLimit
seriesBloomFilterMisses tally.Counter

newSeekerMgrFn newSeekerMgrFn

Expand Down Expand Up @@ -126,18 +126,18 @@ func NewBlockRetriever(
scope := fsOpts.InstrumentOptions().MetricsScope().SubScope("retriever")

return &blockRetriever{
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
seriesReadCount: scope.Counter("series-read"),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
idPool: opts.IdentifierPool(),
status: blockRetrieverNotOpen,
notifyFetch: make(chan struct{}, 1),
opts: opts,
fsOpts: fsOpts,
logger: fsOpts.InstrumentOptions().Logger(),
queryLimits: opts.QueryLimits(),
bytesReadLimit: opts.QueryLimits().BytesReadLimit(),
seriesBloomFilterMisses: scope.Counter("series-bloom-filter-misses"),
newSeekerMgrFn: NewSeekerManager,
reqPool: opts.RetrieveRequestPool(),
bytesPool: opts.BytesPool(),
idPool: opts.IdentifierPool(),
status: blockRetrieverNotOpen,
notifyFetch: make(chan struct{}, 1),
// We just close this channel when the fetchLoops should shutdown, so no
// buffering is required
fetchLoopsShouldShutdownCh: make(chan struct{}),
Expand Down Expand Up @@ -560,6 +560,33 @@ func (r *blockRetriever) fetchBatch(
}
}

func (r *blockRetriever) seriesPresentInBloomFilter(
id ident.ID,
shard uint32,
startTime time.Time,
) (bool, error) {
// Capture variable and RLock() because this slice can be modified in the
// Open() method
r.RLock()
// This should never happen unless caller tries to use Stream() before Open()
if r.seekerMgr == nil {
r.RUnlock()
return false, errNoSeekerMgr
}
r.RUnlock()

idExists, err := r.seekerMgr.Test(id, shard, startTime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be the captured variable no?

r.RLock()
seekerMgr := r.seekerMgr
r.RUnlock()

if r.seekerMgr == nil {
  return false, errNoSeekerMgr
} 

idExists, err := seekerMgr.Test(id, shard, startTime)

As per the comment:

	// Capture variable and RLock() because this slice can be modified in the
	// Open() method

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if err != nil {
return false, err
}

if !idExists {
r.seriesBloomFilterMisses.Inc(1)
}

return idExists, nil
}

// streamRequest returns a bool indicating if the ID was found, and any errors.
func (r *blockRetriever) streamRequest(
ctx context.Context,
Expand All @@ -568,11 +595,10 @@ func (r *blockRetriever) streamRequest(
id ident.ID,
startTime time.Time,
nsCtx namespace.Context,
) (bool, error) {
) error {
req.resultWg.Add(1)
r.seriesReadCount.Inc(1)
if err := r.queryLimits.DiskSeriesReadLimit().Inc(1, req.source); err != nil {
return false, err
return err
}
req.shard = shard

Expand All @@ -592,29 +618,9 @@ func (r *blockRetriever) streamRequest(
// Ensure to finalize at the end of request
ctx.RegisterFinalizer(req)

// Capture variable and RLock() because this slice can be modified in the
// Open() method
r.RLock()
// This should never happen unless caller tries to use Stream() before Open()
if r.seekerMgr == nil {
r.RUnlock()
return false, errNoSeekerMgr
}
r.RUnlock()

idExists, err := r.seekerMgr.Test(id, shard, startTime)
if err != nil {
return false, err
}

// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately.
if !idExists {
return false, nil
}
reqs, err := r.shardRequests(shard)
if err != nil {
return false, err
return err
}

reqs.Lock()
Expand All @@ -633,7 +639,7 @@ func (r *blockRetriever) streamRequest(
// the data. This means that even though we're returning nil for error
// here, the caller may still encounter an error when they attempt to
// read the data.
return true, nil
return nil
}

func (r *blockRetriever) Stream(
Expand All @@ -644,6 +650,16 @@ func (r *blockRetriever) Stream(
onRetrieve block.OnRetrieveBlock,
nsCtx namespace.Context,
) (xio.BlockReader, error) {
found, err := r.seriesPresentInBloomFilter(id, shard, startTime)
if err != nil {
return xio.EmptyBlockReader, err
}
// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately.
if !found {
return xio.EmptyBlockReader, nil
}

req := r.reqPool.Get()
req.onRetrieve = onRetrieve
req.streamReqType = streamDataReq
Expand All @@ -655,18 +671,12 @@ func (r *blockRetriever) Stream(
}
}

found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
if err != nil {
req.resultWg.Done()
return xio.EmptyBlockReader, err
}

if !found {
req.onRetrieved(ts.Segment{}, namespace.Context{})
req.success = true
req.onDone()
}

// The request may not have completed yet, but it has an internal
// waitgroup which the caller will have to wait for before retrieving
// the data. This means that even though we're returning nil for error
Expand All @@ -683,22 +693,26 @@ func (r *blockRetriever) StreamWideEntry(
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (block.StreamedWideEntry, error) {
found, err := r.seriesPresentInBloomFilter(id, shard, startTime)
if err != nil {
return block.EmptyStreamedWideEntry, err
}
// If the ID is not in the seeker's bloom filter, then it's definitely not on
// disk and we can return immediately.
if !found {
return block.EmptyStreamedWideEntry, nil
}

req := r.reqPool.Get()
req.streamReqType = streamWideEntryReq
req.wideFilter = filter

found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
err = r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
if err != nil {
req.resultWg.Done()
return block.EmptyStreamedWideEntry, err
}

if !found {
req.wideEntry = xio.WideEntry{}
req.success = true
req.onDone()
}

// The request may not have completed yet, but it has an internal
// waitgroup which the caller will have to wait for before retrieving
// the data. This means that even though we're returning nil for error
Expand Down
32 changes: 20 additions & 12 deletions src/dbnode/persist/fs/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ type streamResult struct {
shard uint32
id string
blockStart time.Time
stream xio.SegmentReader
stream xio.BlockReader
}

// TestBlockRetrieverHighConcurrentSeeks tests the retriever with high
Expand Down Expand Up @@ -395,6 +395,14 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices
}

for _, r := range results {
compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)]

// If the stream is empty, assert that the expected result is also nil
if r.stream.IsEmpty() {
require.Nil(t, compare.Head)
continue
}

seg, err := r.stream.Segment()
if err != nil {
fmt.Printf("\nstream seg err: %v\n", err)
Expand All @@ -404,7 +412,6 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices
}

require.NoError(t, err)
compare.Head = shardData[r.shard][r.id][xtime.ToUnixNano(r.blockStart)]
require.True(
t,
seg.Equal(&compare),
Expand Down Expand Up @@ -538,6 +545,8 @@ func testBlockRetrieverHighConcurrentSeeks(t *testing.T, shouldCacheShardIndices
// on the retriever in the case where the requested ID does not exist. In that
// case, Stream() should return an empty segment.
func TestBlockRetrieverIDDoesNotExist(t *testing.T) {
scope := tally.NewTestScope("test", nil)

// Make sure reader/writer are looking at the same test directory
dir, err := ioutil.TempDir("", "testdb")
require.NoError(t, err)
Expand All @@ -555,7 +564,7 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) {
// Setup the reader
opts := testBlockRetrieverOptions{
retrieverOpts: defaultTestBlockRetrieverOptions,
fsOpts: fsOpts,
fsOpts: fsOpts.SetInstrumentOptions(instrument.NewOptions().SetMetricsScope(scope)),
shards: []uint32{shard},
}
retriever, cleanup := newOpenTestBlockRetriever(t, testNs1Metadata(t), opts)
Expand All @@ -572,17 +581,18 @@ func TestBlockRetrieverIDDoesNotExist(t *testing.T) {
assert.NoError(t, err)
closer()

// Make sure we return the correct error if the ID does not exist
ctx := context.NewContext()
defer ctx.Close()
segmentReader, err := retriever.Stream(ctx, shard,
ident.StringID("not-exists"), blockStart, nil, nsCtx)
assert.NoError(t, err)

segment, err := segmentReader.Segment()
assert.NoError(t, err)
assert.Equal(t, nil, segment.Head)
assert.Equal(t, nil, segment.Tail)
assert.True(t, segmentReader.IsEmpty())

// Check that the bloom filter miss metric was incremented
snapshot := scope.Snapshot()
seriesRead := snapshot.Counters()["test.retriever.series-bloom-filter-misses+"]
require.Equal(t, int64(1), seriesRead.Value())
}

// TestBlockRetrieverOnlyCreatesTagItersIfTagsExists verifies that the block retriever
Expand Down Expand Up @@ -823,14 +833,12 @@ func TestLimitSeriesReadFromDisk(t *testing.T) {
require.NoError(t, err)
req := &retrieveRequest{}
retriever := publicRetriever.(*blockRetriever)
_, _ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
_, err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
_ = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
err = retriever.streamRequest(context.NewContext(), req, 0, ident.StringID("id"), time.Now(), namespace.Context{})
require.Error(t, err)
require.Contains(t, err.Error(), "query aborted due to limit")

snapshot := scope.Snapshot()
seriesRead := snapshot.Counters()["test.retriever.series-read+"]
require.Equal(t, int64(2), seriesRead.Value())
seriesLimit := snapshot.Counters()["test.query-limit.exceeded+limit=disk-series-read"]
require.Equal(t, int64(1), seriesLimit.Value())
}
Expand Down