Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Add wide filter #2949

Merged
merged 10 commits into from
Nov 30, 2020
2 changes: 2 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ linters:
# New line required before return would require a large fraction of the
# code base to need updating, it's not worth the perceived benefit.
- nlreturn
# Opinionated and sometimes wrong.
- paralleltest
disable-all: false
presets:
# bodyclose, errcheck, gosec, govet, scopelint, staticcheck, typecheck
Expand Down
17 changes: 9 additions & 8 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions src/dbnode/persist/fs/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/limits"
Expand Down Expand Up @@ -314,7 +315,7 @@ func (r *blockRetriever) filterAndCompleteWideReqs(
retrieverResources.dataReqs = append(retrieverResources.dataReqs, req)

case streamWideEntryReq:
entry, err := seeker.SeekWideEntry(req.id, seekerResources)
entry, err := seeker.SeekWideEntry(req.id, req.wideFilter, seekerResources)
if err != nil {
if errors.Is(err, errSeekIDNotFound) {
// Missing, return empty result, successful lookup.
Expand All @@ -329,7 +330,7 @@ func (r *blockRetriever) filterAndCompleteWideReqs(

// Enqueue for fetch in batch in offset ascending order.
req.wideEntry = entry
entry.Shard = req.shard
req.wideEntry.Shard = req.shard
retrieverResources.appendWideEntryReq(req)

default:
Expand Down Expand Up @@ -663,10 +664,12 @@ func (r *blockRetriever) StreamWideEntry(
shard uint32,
id ident.ID,
startTime time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (block.StreamedWideEntry, error) {
req := r.reqPool.Get()
req.streamReqType = streamWideEntryReq
req.wideFilter = filter

found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx)
if err != nil {
Expand Down Expand Up @@ -788,6 +791,7 @@ type retrieveRequest struct {
streamReqType streamReqType
indexEntry IndexEntry
wideEntry xio.WideEntry
wideFilter schema.WideEntryFilter
reader xio.SegmentReader

err error
Expand Down Expand Up @@ -965,6 +969,7 @@ func (req *retrieveRequest) resetForReuse() {
req.streamReqType = streamInvalidReq
req.indexEntry = IndexEntry{}
req.wideEntry = xio.WideEntry{}
req.wideFilter = nil
req.reader = nil
req.err = nil
req.notFound = false
Expand Down
11 changes: 11 additions & 0 deletions src/dbnode/persist/fs/seek.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func (s *seeker) SeekIndexEntry(
// far we know it does not exist.
func (s *seeker) SeekWideEntry(
id ident.ID,
filter schema.WideEntryFilter,
resources ReusableSeekerResources,
) (xio.WideEntry, error) {
offset, err := s.indexLookup.getNearestIndexFileOffset(id, resources)
Expand Down Expand Up @@ -508,6 +509,16 @@ func (s *seeker) SeekWideEntry(
return xio.WideEntry{}, instrument.InvariantErrorf(err.Error())
}

if filter != nil {
filtered, err := filter(entry)
if err != nil || filtered {
// NB: this entry is not being taken, can free memory.
resources.decodeIndexEntryBytesPool.Put(entry.ID)
resources.decodeIndexEntryBytesPool.Put(entry.EncodedTags)
return xio.WideEntry{}, err
}
}

if status != xmsgpack.MatchedLookupStatus {
// No longer being used so we can return to the pool.
resources.decodeIndexEntryBytesPool.Put(entry.ID)
Expand Down
13 changes: 11 additions & 2 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs/msgpack"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
Expand Down Expand Up @@ -221,7 +222,11 @@ type DataFileSetSeeker interface {

// SeekWideEntry seeks in a manner similar to SeekIndexEntry, but
// instead yields a wide entry checksum of the series.
SeekWideEntry(id ident.ID, resources ReusableSeekerResources) (xio.WideEntry, error)
SeekWideEntry(
id ident.ID,
filter schema.WideEntryFilter,
resources ReusableSeekerResources,
) (xio.WideEntry, error)

// Range returns the time range associated with data in the volume
Range() xtime.Range
Expand Down Expand Up @@ -259,7 +264,11 @@ type ConcurrentDataFileSetSeeker interface {
SeekIndexEntry(id ident.ID, resources ReusableSeekerResources) (IndexEntry, error)

// SeekWideEntry is the same as in DataFileSetSeeker.
SeekWideEntry(id ident.ID, resources ReusableSeekerResources) (xio.WideEntry, error)
SeekWideEntry(
id ident.ID,
filter schema.WideEntryFilter,
resources ReusableSeekerResources,
) (xio.WideEntry, error)

// ConcurrentIDBloomFilter is the same as in DataFileSetSeeker.
ConcurrentIDBloomFilter() *ManagedConcurrentBloomFilter
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/persist/schema/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ type WideEntry struct {
MetadataChecksum int64
}

// WideEntryFilter provides a filter for wide entries.
type WideEntryFilter func(entry WideEntry) (bool, error)

// IndexEntryHasher hashes an index entry.
type IndexEntryHasher interface {
// HashIndexEntry computes a hash value for this IndexEntry using its ID, tags,
Expand Down
17 changes: 9 additions & 8 deletions src/dbnode/storage/block/block_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/dbnode/storage/block/retriever_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/context"
Expand Down Expand Up @@ -116,10 +117,11 @@ func (r *shardBlockRetriever) StreamWideEntry(
ctx context.Context,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (StreamedWideEntry, error) {
return r.DatabaseBlockRetriever.StreamWideEntry(ctx, r.shard, id,
blockStart, nsCtx)
blockStart, filter, nsCtx)
}

type shardBlockRetrieverManager struct {
Expand Down
3 changes: 3 additions & 0 deletions src/dbnode/storage/block/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/ts"
Expand Down Expand Up @@ -312,6 +313,7 @@ type DatabaseBlockRetriever interface {
shard uint32,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (StreamedWideEntry, error)

Expand All @@ -336,6 +338,7 @@ type DatabaseShardBlockRetriever interface {
ctx context.Context,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
nsCtx namespace.Context,
) (StreamedWideEntry, error)
}
Expand Down
12 changes: 5 additions & 7 deletions src/dbnode/storage/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,12 +941,9 @@ func (d *db) ReadEncoded(
return n.ReadEncoded(ctx, id, start, end)
}

// batchProcessWideQuery runs the given query against the namespace index,
// iterating in a batchwise fashion across all matching IDs, applying the given
// IDBatchProcessor batch processing function to each ID discovered.
func (d *db) batchProcessWideQuery(
func (d *db) BatchProcessWideQuery(
ctx context.Context,
n databaseNamespace,
n Namespace,
query index.Query,
batchProcessor IDBatchProcessor,
opts index.WideQueryOptions,
Expand Down Expand Up @@ -1029,8 +1026,9 @@ func (d *db) WideQuery(
streamedWideEntries := make([]block.StreamedWideEntry, 0, batchSize)
indexChecksumProcessor := func(batch *ident.IDBatch) error {
streamedWideEntries = streamedWideEntries[:0]

for _, shardID := range batch.ShardIDs {
streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start)
streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start, nil)
if err != nil {
return err
}
Expand All @@ -1050,7 +1048,7 @@ func (d *db) WideQuery(
return nil
}

err = d.batchProcessWideQuery(ctx, n, query, indexChecksumProcessor, opts)
err = d.BatchProcessWideQuery(ctx, n, query, indexChecksumProcessor, opts)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,7 @@ func TestWideQuery(t *testing.T) {
ns *MockdatabaseNamespace, d *db, q index.Query,
now time.Time, shards []uint32, iterOpts index.IterationOptions) {
ns.EXPECT().FetchWideEntry(gomock.Any(),
ident.StringID("foo"), gomock.Any()).
ident.StringID("foo"), gomock.Any(), nil).
Return(block.EmptyStreamedWideEntry, nil)

_, err := d.WideQuery(ctx, ident.StringID("testns"), q, now, shards, iterOpts)
Expand Down
4 changes: 3 additions & 1 deletion src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage/block"
"github.com/m3db/m3/src/dbnode/storage/bootstrap"
Expand Down Expand Up @@ -922,6 +923,7 @@ func (n *dbNamespace) FetchWideEntry(
ctx context.Context,
id ident.ID,
blockStart time.Time,
filter schema.WideEntryFilter,
) (block.StreamedWideEntry, error) {
callStart := n.nowFn()
shard, nsCtx, err := n.readableShardFor(id)
Expand All @@ -931,7 +933,7 @@ func (n *dbNamespace) FetchWideEntry(
return block.EmptyStreamedWideEntry, err
}

res, err := shard.FetchWideEntry(ctx, id, blockStart, nsCtx)
res, err := shard.FetchWideEntry(ctx, id, blockStart, filter, nsCtx)
n.metrics.read.ReportSuccessOrError(err, n.nowFn().Sub(callStart))

return res, err
Expand Down
Loading