diff --git a/.golangci.yml b/.golangci.yml index 05855d38b6..206e53d24c 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -223,6 +223,8 @@ linters: # New line required before return would require a large fraction of the # code base to need updating, it's not worth the perceived benefit. - nlreturn + # Opinionated and sometimes wrong. + - paralleltest disable-all: false presets: # bodyclose, errcheck, gosec, govet, scopelint, staticcheck, typecheck diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 25c89d94ca..cd40dd2d47 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" persist "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" @@ -523,18 +524,18 @@ func (mr *MockDataFileSetSeekerMockRecorder) SeekIndexEntry(arg0, arg1 interface } // SeekWideEntry mocks base method -func (m *MockDataFileSetSeeker) SeekWideEntry(arg0 ident.ID, arg1 ReusableSeekerResources) (xio.WideEntry, error) { +func (m *MockDataFileSetSeeker) SeekWideEntry(arg0 ident.ID, arg1 schema.WideEntryFilter, arg2 ReusableSeekerResources) (xio.WideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SeekWideEntry", arg0, arg1) + ret := m.ctrl.Call(m, "SeekWideEntry", arg0, arg1, arg2) ret0, _ := ret[0].(xio.WideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // SeekWideEntry indicates an expected call of SeekWideEntry -func (mr *MockDataFileSetSeekerMockRecorder) SeekWideEntry(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockDataFileSetSeekerMockRecorder) SeekWideEntry(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeekWideEntry", reflect.TypeOf((*MockDataFileSetSeeker)(nil).SeekWideEntry), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeekWideEntry", reflect.TypeOf((*MockDataFileSetSeeker)(nil).SeekWideEntry), arg0, arg1, arg2) } // MockIndexFileSetWriter is a mock of IndexFileSetWriter interface @@ -1259,18 +1260,18 @@ func (mr *MockConcurrentDataFileSetSeekerMockRecorder) SeekIndexEntry(arg0, arg1 } // SeekWideEntry mocks base method -func (m *MockConcurrentDataFileSetSeeker) SeekWideEntry(arg0 ident.ID, arg1 ReusableSeekerResources) (xio.WideEntry, error) { +func (m *MockConcurrentDataFileSetSeeker) SeekWideEntry(arg0 ident.ID, arg1 schema.WideEntryFilter, arg2 ReusableSeekerResources) (xio.WideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SeekWideEntry", arg0, arg1) + ret := m.ctrl.Call(m, "SeekWideEntry", arg0, arg1, arg2) ret0, _ := ret[0].(xio.WideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // SeekWideEntry indicates an expected call of SeekWideEntry -func (mr *MockConcurrentDataFileSetSeekerMockRecorder) SeekWideEntry(arg0, arg1 interface{}) *gomock.Call { +func (mr *MockConcurrentDataFileSetSeekerMockRecorder) SeekWideEntry(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeekWideEntry", reflect.TypeOf((*MockConcurrentDataFileSetSeeker)(nil).SeekWideEntry), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SeekWideEntry", reflect.TypeOf((*MockConcurrentDataFileSetSeeker)(nil).SeekWideEntry), arg0, arg1, arg2) } // MockMergeWith is a mock of MergeWith interface diff --git a/src/dbnode/persist/fs/retriever.go b/src/dbnode/persist/fs/retriever.go index 256eca347c..a64b76da83 100644 --- a/src/dbnode/persist/fs/retriever.go +++ b/src/dbnode/persist/fs/retriever.go @@ -40,6 +40,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/limits" @@ -314,7 +315,7 @@ func (r *blockRetriever) filterAndCompleteWideReqs( retrieverResources.dataReqs = append(retrieverResources.dataReqs, req) case streamWideEntryReq: - entry, err := seeker.SeekWideEntry(req.id, seekerResources) + entry, err := seeker.SeekWideEntry(req.id, req.wideFilter, seekerResources) if err != nil { if errors.Is(err, errSeekIDNotFound) { // Missing, return empty result, successful lookup. @@ -329,7 +330,7 @@ func (r *blockRetriever) filterAndCompleteWideReqs( // Enqueue for fetch in batch in offset ascending order. req.wideEntry = entry - entry.Shard = req.shard + req.wideEntry.Shard = req.shard retrieverResources.appendWideEntryReq(req) default: @@ -663,10 +664,12 @@ func (r *blockRetriever) StreamWideEntry( shard uint32, id ident.ID, startTime time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { req := r.reqPool.Get() req.streamReqType = streamWideEntryReq + req.wideFilter = filter found, err := r.streamRequest(ctx, req, shard, id, startTime, nsCtx) if err != nil { @@ -788,6 +791,7 @@ type retrieveRequest struct { streamReqType streamReqType indexEntry IndexEntry wideEntry xio.WideEntry + wideFilter schema.WideEntryFilter reader xio.SegmentReader err error @@ -965,6 +969,7 @@ func (req *retrieveRequest) resetForReuse() { req.streamReqType = streamInvalidReq req.indexEntry = IndexEntry{} req.wideEntry = xio.WideEntry{} + req.wideFilter = nil req.reader = nil req.err = nil req.notFound = false diff --git a/src/dbnode/persist/fs/seek.go b/src/dbnode/persist/fs/seek.go index 956884134b..faa4fd3a9b 100644 --- a/src/dbnode/persist/fs/seek.go +++ b/src/dbnode/persist/fs/seek.go @@ -477,6 +477,7 @@ func (s *seeker) SeekIndexEntry( // far we know it does not exist. func (s *seeker) SeekWideEntry( id ident.ID, + filter schema.WideEntryFilter, resources ReusableSeekerResources, ) (xio.WideEntry, error) { offset, err := s.indexLookup.getNearestIndexFileOffset(id, resources) @@ -508,6 +509,16 @@ func (s *seeker) SeekWideEntry( return xio.WideEntry{}, instrument.InvariantErrorf(err.Error()) } + if filter != nil { + filtered, err := filter(entry) + if err != nil || filtered { + // NB: this entry is not being taken, can free memory. + resources.decodeIndexEntryBytesPool.Put(entry.ID) + resources.decodeIndexEntryBytesPool.Put(entry.EncodedTags) + return xio.WideEntry{}, err + } + } + if status != xmsgpack.MatchedLookupStatus { // No longer being used so we can return to the pool. resources.decodeIndexEntryBytesPool.Put(entry.ID) diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 9c89e6566a..d359e74f27 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs/msgpack" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" @@ -221,7 +222,11 @@ type DataFileSetSeeker interface { // SeekWideEntry seeks in a manner similar to SeekIndexEntry, but // instead yields a wide entry checksum of the series. - SeekWideEntry(id ident.ID, resources ReusableSeekerResources) (xio.WideEntry, error) + SeekWideEntry( + id ident.ID, + filter schema.WideEntryFilter, + resources ReusableSeekerResources, + ) (xio.WideEntry, error) // Range returns the time range associated with data in the volume Range() xtime.Range @@ -259,7 +264,11 @@ type ConcurrentDataFileSetSeeker interface { SeekIndexEntry(id ident.ID, resources ReusableSeekerResources) (IndexEntry, error) // SeekWideEntry is the same as in DataFileSetSeeker. - SeekWideEntry(id ident.ID, resources ReusableSeekerResources) (xio.WideEntry, error) + SeekWideEntry( + id ident.ID, + filter schema.WideEntryFilter, + resources ReusableSeekerResources, + ) (xio.WideEntry, error) // ConcurrentIDBloomFilter is the same as in DataFileSetSeeker. ConcurrentIDBloomFilter() *ManagedConcurrentBloomFilter diff --git a/src/dbnode/persist/schema/types.go b/src/dbnode/persist/schema/types.go index 52ab6b233d..a13968818b 100644 --- a/src/dbnode/persist/schema/types.go +++ b/src/dbnode/persist/schema/types.go @@ -85,6 +85,9 @@ type WideEntry struct { MetadataChecksum int64 } +// WideEntryFilter provides a filter for wide entries. +type WideEntryFilter func(entry WideEntry) (bool, error) + // IndexEntryHasher hashes an index entry. type IndexEntryHasher interface { // HashIndexEntry computes a hash value for this IndexEntry using its ID, tags, diff --git a/src/dbnode/storage/block/block_mock.go b/src/dbnode/storage/block/block_mock.go index a3462a1224..7f8b930ab9 100644 --- a/src/dbnode/storage/block/block_mock.go +++ b/src/dbnode/storage/block/block_mock.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" @@ -935,18 +936,18 @@ func (mr *MockDatabaseBlockRetrieverMockRecorder) Stream(ctx, shard, id, blockSt } // StreamWideEntry mocks base method -func (m *MockDatabaseBlockRetriever) StreamWideEntry(ctx context.Context, shard uint32, id ident.ID, blockStart time.Time, nsCtx namespace.Context) (StreamedWideEntry, error) { +func (m *MockDatabaseBlockRetriever) StreamWideEntry(ctx context.Context, shard uint32, id ident.ID, blockStart time.Time, filter schema.WideEntryFilter, nsCtx namespace.Context) (StreamedWideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StreamWideEntry", ctx, shard, id, blockStart, nsCtx) + ret := m.ctrl.Call(m, "StreamWideEntry", ctx, shard, id, blockStart, filter, nsCtx) ret0, _ := ret[0].(StreamedWideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // StreamWideEntry indicates an expected call of StreamWideEntry -func (mr *MockDatabaseBlockRetrieverMockRecorder) StreamWideEntry(ctx, shard, id, blockStart, nsCtx interface{}) *gomock.Call { +func (mr *MockDatabaseBlockRetrieverMockRecorder) StreamWideEntry(ctx, shard, id, blockStart, filter, nsCtx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamWideEntry", reflect.TypeOf((*MockDatabaseBlockRetriever)(nil).StreamWideEntry), ctx, shard, id, blockStart, nsCtx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamWideEntry", reflect.TypeOf((*MockDatabaseBlockRetriever)(nil).StreamWideEntry), ctx, shard, id, blockStart, filter, nsCtx) } // AssignShardSet mocks base method @@ -1000,18 +1001,18 @@ func (mr *MockDatabaseShardBlockRetrieverMockRecorder) Stream(ctx, id, blockStar } // StreamWideEntry mocks base method -func (m *MockDatabaseShardBlockRetriever) StreamWideEntry(ctx context.Context, id ident.ID, blockStart time.Time, nsCtx namespace.Context) (StreamedWideEntry, error) { +func (m *MockDatabaseShardBlockRetriever) StreamWideEntry(ctx context.Context, id ident.ID, blockStart time.Time, filter schema.WideEntryFilter, nsCtx namespace.Context) (StreamedWideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StreamWideEntry", ctx, id, blockStart, nsCtx) + ret := m.ctrl.Call(m, "StreamWideEntry", ctx, id, blockStart, filter, nsCtx) ret0, _ := ret[0].(StreamedWideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // StreamWideEntry indicates an expected call of StreamWideEntry -func (mr *MockDatabaseShardBlockRetrieverMockRecorder) StreamWideEntry(ctx, id, blockStart, nsCtx interface{}) *gomock.Call { +func (mr *MockDatabaseShardBlockRetrieverMockRecorder) StreamWideEntry(ctx, id, blockStart, filter, nsCtx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamWideEntry", reflect.TypeOf((*MockDatabaseShardBlockRetriever)(nil).StreamWideEntry), ctx, id, blockStart, nsCtx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamWideEntry", reflect.TypeOf((*MockDatabaseShardBlockRetriever)(nil).StreamWideEntry), ctx, id, blockStart, filter, nsCtx) } // MockDatabaseBlockRetrieverManager is a mock of DatabaseBlockRetrieverManager interface diff --git a/src/dbnode/storage/block/retriever_manager.go b/src/dbnode/storage/block/retriever_manager.go index 979735bf78..d618c70b3d 100644 --- a/src/dbnode/storage/block/retriever_manager.go +++ b/src/dbnode/storage/block/retriever_manager.go @@ -25,6 +25,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/x/xio" "github.com/m3db/m3/src/x/context" @@ -116,10 +117,11 @@ func (r *shardBlockRetriever) StreamWideEntry( ctx context.Context, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (StreamedWideEntry, error) { return r.DatabaseBlockRetriever.StreamWideEntry(ctx, r.shard, id, - blockStart, nsCtx) + blockStart, filter, nsCtx) } type shardBlockRetrieverManager struct { diff --git a/src/dbnode/storage/block/types.go b/src/dbnode/storage/block/types.go index 34a2161ab8..9d51f12e1a 100644 --- a/src/dbnode/storage/block/types.go +++ b/src/dbnode/storage/block/types.go @@ -25,6 +25,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/topology" "github.com/m3db/m3/src/dbnode/ts" @@ -312,6 +313,7 @@ type DatabaseBlockRetriever interface { shard uint32, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (StreamedWideEntry, error) @@ -336,6 +338,7 @@ type DatabaseShardBlockRetriever interface { ctx context.Context, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (StreamedWideEntry, error) } diff --git a/src/dbnode/storage/database.go b/src/dbnode/storage/database.go index 5d0a645d12..971bd92169 100644 --- a/src/dbnode/storage/database.go +++ b/src/dbnode/storage/database.go @@ -941,12 +941,9 @@ func (d *db) ReadEncoded( return n.ReadEncoded(ctx, id, start, end) } -// batchProcessWideQuery runs the given query against the namespace index, -// iterating in a batchwise fashion across all matching IDs, applying the given -// IDBatchProcessor batch processing function to each ID discovered. -func (d *db) batchProcessWideQuery( +func (d *db) BatchProcessWideQuery( ctx context.Context, - n databaseNamespace, + n Namespace, query index.Query, batchProcessor IDBatchProcessor, opts index.WideQueryOptions, @@ -1029,8 +1026,9 @@ func (d *db) WideQuery( streamedWideEntries := make([]block.StreamedWideEntry, 0, batchSize) indexChecksumProcessor := func(batch *ident.IDBatch) error { streamedWideEntries = streamedWideEntries[:0] + for _, shardID := range batch.ShardIDs { - streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start) + streamedWideEntry, err := n.FetchWideEntry(ctx, shardID.ID, start, nil) if err != nil { return err } @@ -1050,7 +1048,7 @@ func (d *db) WideQuery( return nil } - err = d.batchProcessWideQuery(ctx, n, query, indexChecksumProcessor, opts) + err = d.BatchProcessWideQuery(ctx, n, query, indexChecksumProcessor, opts) if err != nil { return nil, err } diff --git a/src/dbnode/storage/database_test.go b/src/dbnode/storage/database_test.go index a843400f67..d0f669e77a 100644 --- a/src/dbnode/storage/database_test.go +++ b/src/dbnode/storage/database_test.go @@ -902,7 +902,7 @@ func TestWideQuery(t *testing.T) { ns *MockdatabaseNamespace, d *db, q index.Query, now time.Time, shards []uint32, iterOpts index.IterationOptions) { ns.EXPECT().FetchWideEntry(gomock.Any(), - ident.StringID("foo"), gomock.Any()). + ident.StringID("foo"), gomock.Any(), nil). Return(block.EmptyStreamedWideEntry, nil) _, err := d.WideQuery(ctx, ident.StringID("testns"), q, now, shards, iterOpts) diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 6a45b1cf71..c4bdbb4e62 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/storage/bootstrap" @@ -922,6 +923,7 @@ func (n *dbNamespace) FetchWideEntry( ctx context.Context, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, ) (block.StreamedWideEntry, error) { callStart := n.nowFn() shard, nsCtx, err := n.readableShardFor(id) @@ -931,7 +933,7 @@ func (n *dbNamespace) FetchWideEntry( return block.EmptyStreamedWideEntry, err } - res, err := shard.FetchWideEntry(ctx, id, blockStart, nsCtx) + res, err := shard.FetchWideEntry(ctx, id, blockStart, filter, nsCtx) n.metrics.read.ReportSuccessOrError(err, n.nowFn().Sub(callStart)) return res, err diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 8f64e67613..5e2e0e9346 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -303,6 +303,48 @@ func TestNamespaceReadEncodedShardOwned(t *testing.T) { require.Equal(t, errShardNotBootstrappedToRead, xerrors.GetInnerRetryableError(err)) } +func TestNamespaceFetchWideEntryShardNotOwned(t *testing.T) { + ctx := context.NewContext() + defer ctx.Close() + + ns, closer := newTestNamespace(t) + defer closer() + + for i := range ns.shards { + ns.shards[i] = nil + } + _, err := ns.FetchWideEntry(ctx, ident.StringID("foo"), time.Now(), nil) + require.Error(t, err) +} + +func TestNamespaceFetchWideEntryShardOwned(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ctx := context.NewContext() + defer ctx.Close() + + id := ident.StringID("foo") + start := time.Now() + + ns, closer := newTestNamespace(t) + defer closer() + + shard := NewMockdatabaseShard(ctrl) + shard.EXPECT().FetchWideEntry(ctx, id, start, gomock.Any(), gomock.Any()).Return(nil, nil) + ns.shards[testShardIDs[0].ID()] = shard + + shard.EXPECT().IsBootstrapped().Return(true) + _, err := ns.FetchWideEntry(ctx, id, start, nil) + require.NoError(t, err) + + shard.EXPECT().IsBootstrapped().Return(false) + _, err = ns.FetchWideEntry(ctx, id, start, nil) + require.Error(t, err) + require.True(t, xerrors.IsRetryableError(err)) + require.Equal(t, errShardNotBootstrappedToRead, xerrors.GetInnerRetryableError(err)) +} + func TestNamespaceFetchBlocksShardNotOwned(t *testing.T) { ctx := context.NewContext() defer ctx.Close() diff --git a/src/dbnode/storage/series/reader.go b/src/dbnode/storage/series/reader.go index 0fc9ccb44a..31498a5479 100644 --- a/src/dbnode/storage/series/reader.go +++ b/src/dbnode/storage/series/reader.go @@ -26,6 +26,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/namespace" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/x/xio" @@ -206,6 +207,7 @@ func (r *Reader) readersWithBlocksMapAndBufferAligned( func (r *Reader) FetchWideEntry( ctx context.Context, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { var ( @@ -232,7 +234,7 @@ func (r *Reader) FetchWideEntry( return block.EmptyStreamedWideEntry, nil } streamedEntry, err := r.retriever.StreamWideEntry(ctx, - r.id, blockStart, nsCtx) + r.id, blockStart, filter, nsCtx) if err != nil { return block.EmptyStreamedWideEntry, err } diff --git a/src/dbnode/storage/series/reader_test.go b/src/dbnode/storage/series/reader_test.go index 70090c785c..a0eb4f4633 100644 --- a/src/dbnode/storage/series/reader_test.go +++ b/src/dbnode/storage/series/reader_test.go @@ -102,11 +102,11 @@ func TestReaderUsingRetrieverWideEntrysBlockInvalid(t *testing.T) { retriever.EXPECT().IsBlockRetrievable(gomock.Any()). Return(false, errors.New("err")) - _, err := reader.FetchWideEntry(ctx, time.Now(), namespace.Context{}) + _, err := reader.FetchWideEntry(ctx, time.Now(), nil, namespace.Context{}) assert.EqualError(t, err, "err") retriever.EXPECT().IsBlockRetrievable(gomock.Any()).Return(false, nil) - e, err := reader.FetchWideEntry(ctx, time.Now(), namespace.Context{}) + e, err := reader.FetchWideEntry(ctx, time.Now(), nil, namespace.Context{}) assert.NoError(t, err) entry, err := e.RetrieveWideEntry() @@ -134,14 +134,14 @@ func TestReaderUsingRetrieverWideEntrys(t *testing.T) { retriever.EXPECT(). StreamWideEntry(ctx, ident.NewIDMatcher("foo"), - alignedStart, gomock.Any()). + alignedStart, nil, gomock.Any()). Return(streamedEntry, nil).Times(2) reader := NewReaderUsingRetriever( ident.StringID("foo"), retriever, nil, nil, opts) streamedEntry.EXPECT().RetrieveWideEntry().Return(xio.WideEntry{}, errors.New("err")) - streamed, err := reader.FetchWideEntry(ctx, alignedStart, namespace.Context{}) + streamed, err := reader.FetchWideEntry(ctx, alignedStart, nil, namespace.Context{}) require.NoError(t, err) _, err = streamed.RetrieveWideEntry() assert.EqualError(t, err, "err") @@ -153,7 +153,7 @@ func TestReaderUsingRetrieverWideEntrys(t *testing.T) { } streamedEntry.EXPECT().RetrieveWideEntry().Return(entry, nil) - streamed, err = reader.FetchWideEntry(ctx, alignedStart, namespace.Context{}) + streamed, err = reader.FetchWideEntry(ctx, alignedStart, nil, namespace.Context{}) require.NoError(t, err) actual, err := streamed.RetrieveWideEntry() require.NoError(t, err) diff --git a/src/dbnode/storage/series/series.go b/src/dbnode/storage/series/series.go index 4c008db977..63d079de1c 100644 --- a/src/dbnode/storage/series/series.go +++ b/src/dbnode/storage/series/series.go @@ -27,6 +27,7 @@ import ( "time" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" @@ -405,11 +406,12 @@ func (s *dbSeries) ReadEncoded( func (s *dbSeries) FetchWideEntry( ctx context.Context, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { s.RLock() reader := NewReaderUsingRetriever(s.id, s.blockRetriever, s.onRetrieveBlock, s, s.opts) - e, err := reader.FetchWideEntry(ctx, blockStart, nsCtx) + e, err := reader.FetchWideEntry(ctx, blockStart, filter, nsCtx) s.RUnlock() return e, err diff --git a/src/dbnode/storage/series/series_mock.go b/src/dbnode/storage/series/series_mock.go index cc20ef70b4..cf67c09355 100644 --- a/src/dbnode/storage/series/series_mock.go +++ b/src/dbnode/storage/series/series_mock.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/storage/block" "github.com/m3db/m3/src/dbnode/ts" "github.com/m3db/m3/src/dbnode/x/xio" @@ -150,18 +151,18 @@ func (mr *MockDatabaseSeriesMockRecorder) FetchBlocksMetadata(arg0, arg1, arg2, } // FetchWideEntry mocks base method -func (m *MockDatabaseSeries) FetchWideEntry(arg0 context.Context, arg1 time.Time, arg2 namespace.Context) (block.StreamedWideEntry, error) { +func (m *MockDatabaseSeries) FetchWideEntry(arg0 context.Context, arg1 time.Time, arg2 schema.WideEntryFilter, arg3 namespace.Context) (block.StreamedWideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWideEntry", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "FetchWideEntry", arg0, arg1, arg2, arg3) ret0, _ := ret[0].(block.StreamedWideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchWideEntry indicates an expected call of FetchWideEntry -func (mr *MockDatabaseSeriesMockRecorder) FetchWideEntry(arg0, arg1, arg2 interface{}) *gomock.Call { +func (mr *MockDatabaseSeriesMockRecorder) FetchWideEntry(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockDatabaseSeries)(nil).FetchWideEntry), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockDatabaseSeries)(nil).FetchWideEntry), arg0, arg1, arg2, arg3) } // ID mocks base method @@ -457,16 +458,16 @@ func (mr *MockQueryableBlockRetrieverMockRecorder) Stream(arg0, arg1, arg2, arg3 } // StreamWideEntry mocks base method -func (m *MockQueryableBlockRetriever) StreamWideEntry(arg0 context.Context, arg1 ident.ID, arg2 time.Time, arg3 namespace.Context) (block.StreamedWideEntry, error) { +func (m *MockQueryableBlockRetriever) StreamWideEntry(arg0 context.Context, arg1 ident.ID, arg2 time.Time, arg3 schema.WideEntryFilter, arg4 namespace.Context) (block.StreamedWideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StreamWideEntry", arg0, arg1, arg2, arg3) + ret := m.ctrl.Call(m, "StreamWideEntry", arg0, arg1, arg2, arg3, arg4) ret0, _ := ret[0].(block.StreamedWideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // StreamWideEntry indicates an expected call of StreamWideEntry -func (mr *MockQueryableBlockRetrieverMockRecorder) StreamWideEntry(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +func (mr *MockQueryableBlockRetrieverMockRecorder) StreamWideEntry(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamWideEntry", reflect.TypeOf((*MockQueryableBlockRetriever)(nil).StreamWideEntry), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StreamWideEntry", reflect.TypeOf((*MockQueryableBlockRetriever)(nil).StreamWideEntry), arg0, arg1, arg2, arg3, arg4) } diff --git a/src/dbnode/storage/series/types.go b/src/dbnode/storage/series/types.go index 779dcdd858..9f81928d1e 100644 --- a/src/dbnode/storage/series/types.go +++ b/src/dbnode/storage/series/types.go @@ -26,6 +26,7 @@ import ( "github.com/m3db/m3/src/dbnode/encoding" "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" @@ -90,6 +91,7 @@ type DatabaseSeries interface { FetchWideEntry( ctx context.Context, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 591686e98f..6ccd37a9f8 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -33,6 +33,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/retention" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/storage/block" @@ -399,10 +400,11 @@ func (s *dbShard) StreamWideEntry( ctx context.Context, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { return s.DatabaseBlockRetriever.StreamWideEntry(ctx, s.shard, id, - blockStart, nsCtx) + blockStart, filter, nsCtx) } // IsBlockRetrievable implements series.QueryableBlockRetriever @@ -1142,13 +1144,14 @@ func (s *dbShard) FetchWideEntry( ctx context.Context, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) { retriever := s.seriesBlockRetriever opts := s.seriesOpts reader := series.NewReaderUsingRetriever(id, retriever, nil, nil, opts) - return reader.FetchWideEntry(ctx, blockStart, nsCtx) + return reader.FetchWideEntry(ctx, blockStart, filter, nsCtx) } // lookupEntryWithLock returns the entry for a given id while holding a read lock or a write lock. diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 6e4e6a989f..3a6e948bdf 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1613,26 +1613,30 @@ func TestShardFetchIndexChecksum(t *testing.T) { retriever := block.NewMockDatabaseBlockRetriever(ctrl) shard.setBlockRetriever(retriever) - checksum := xio.WideEntry{ - ID: ident.StringID("foo"), - MetadataChecksum: 5, - } + var ( + checksum = xio.WideEntry{ + ID: ident.StringID("foo"), + MetadataChecksum: 5, + } - wideEntry := block.NewMockStreamedWideEntry(ctrl) + wideEntry = block.NewMockStreamedWideEntry(ctrl) + ) retriever.EXPECT(). StreamWideEntry(ctx, shard.shard, ident.NewIDMatcher("foo"), - start, gomock.Any()).Return(wideEntry, nil).Times(2) + start, gomock.Any(), gomock.Any()).Return(wideEntry, nil).Times(2) // First call to RetrieveWideEntry is expected to error on retrieval wideEntry.EXPECT().RetrieveWideEntry(). Return(xio.WideEntry{}, errors.New("err")) - r, err := shard.FetchWideEntry(ctx, ident.StringID("foo"), start, namespace.Context{}) + r, err := shard.FetchWideEntry(ctx, ident.StringID("foo"), + start, nil, namespace.Context{}) require.NoError(t, err) _, err = r.RetrieveWideEntry() assert.EqualError(t, err, "err") wideEntry.EXPECT().RetrieveWideEntry().Return(checksum, nil) - r, err = shard.FetchWideEntry(ctx, ident.StringID("foo"), start, namespace.Context{}) + r, err = shard.FetchWideEntry(ctx, ident.StringID("foo"), + start, nil, namespace.Context{}) require.NoError(t, err) retrieved, err := r.RetrieveWideEntry() require.NoError(t, err) diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index bf9c30d220..27d3afc98a 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -35,6 +35,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" @@ -360,6 +361,20 @@ func (mr *MockDatabaseMockRecorder) WideQuery(ctx, namespace, query, start, shar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WideQuery", reflect.TypeOf((*MockDatabase)(nil).WideQuery), ctx, namespace, query, start, shards, iterOpts) } +// BatchProcessWideQuery mocks base method +func (m *MockDatabase) BatchProcessWideQuery(ctx context.Context, n Namespace, query index.Query, batchProcessor IDBatchProcessor, opts index.WideQueryOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BatchProcessWideQuery", ctx, n, query, batchProcessor, opts) + ret0, _ := ret[0].(error) + return ret0 +} + +// BatchProcessWideQuery indicates an expected call of BatchProcessWideQuery +func (mr *MockDatabaseMockRecorder) BatchProcessWideQuery(ctx, n, query, batchProcessor, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchProcessWideQuery", reflect.TypeOf((*MockDatabase)(nil).BatchProcessWideQuery), ctx, n, query, batchProcessor, opts) +} + // FetchBlocks mocks base method func (m *MockDatabase) FetchBlocks(ctx context.Context, namespace ident.ID, shard uint32, id ident.ID, starts []time.Time) ([]block.FetchBlockResult, error) { m.ctrl.T.Helper() @@ -785,6 +800,20 @@ func (mr *MockdatabaseMockRecorder) WideQuery(ctx, namespace, query, start, shar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WideQuery", reflect.TypeOf((*Mockdatabase)(nil).WideQuery), ctx, namespace, query, start, shards, iterOpts) } +// BatchProcessWideQuery mocks base method +func (m *Mockdatabase) BatchProcessWideQuery(ctx context.Context, n Namespace, query index.Query, batchProcessor IDBatchProcessor, opts index.WideQueryOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BatchProcessWideQuery", ctx, n, query, batchProcessor, opts) + ret0, _ := ret[0].(error) + return ret0 +} + +// BatchProcessWideQuery indicates an expected call of BatchProcessWideQuery +func (mr *MockdatabaseMockRecorder) BatchProcessWideQuery(ctx, n, query, batchProcessor, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchProcessWideQuery", reflect.TypeOf((*Mockdatabase)(nil).BatchProcessWideQuery), ctx, n, query, batchProcessor, opts) +} + // FetchBlocks mocks base method func (m *Mockdatabase) FetchBlocks(ctx context.Context, namespace ident.ID, shard uint32, id ident.ID, starts []time.Time) ([]block.FetchBlockResult, error) { m.ctrl.T.Helper() @@ -1166,19 +1195,33 @@ func (mr *MockNamespaceMockRecorder) DocRef(id interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DocRef", reflect.TypeOf((*MockNamespace)(nil).DocRef), id) } +// WideQueryIDs mocks base method +func (m *MockNamespace) WideQueryIDs(ctx context.Context, query index.Query, collector chan *ident.IDBatch, opts index.WideQueryOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WideQueryIDs", ctx, query, collector, opts) + ret0, _ := ret[0].(error) + return ret0 +} + +// WideQueryIDs indicates an expected call of WideQueryIDs +func (mr *MockNamespaceMockRecorder) WideQueryIDs(ctx, query, collector, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WideQueryIDs", reflect.TypeOf((*MockNamespace)(nil).WideQueryIDs), ctx, query, collector, opts) +} + // FetchWideEntry mocks base method -func (m *MockNamespace) FetchWideEntry(ctx context.Context, id ident.ID, blockStart time.Time) (block.StreamedWideEntry, error) { +func (m *MockNamespace) FetchWideEntry(ctx context.Context, id ident.ID, blockStart time.Time, filter schema.WideEntryFilter) (block.StreamedWideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWideEntry", ctx, id, blockStart) + ret := m.ctrl.Call(m, "FetchWideEntry", ctx, id, blockStart, filter) ret0, _ := ret[0].(block.StreamedWideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchWideEntry indicates an expected call of FetchWideEntry -func (mr *MockNamespaceMockRecorder) FetchWideEntry(ctx, id, blockStart interface{}) *gomock.Call { +func (mr *MockNamespaceMockRecorder) FetchWideEntry(ctx, id, blockStart, filter interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockNamespace)(nil).FetchWideEntry), ctx, id, blockStart) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockNamespace)(nil).FetchWideEntry), ctx, id, blockStart, filter) } // MockdatabaseNamespace is a mock of databaseNamespace interface @@ -1373,19 +1416,33 @@ func (mr *MockdatabaseNamespaceMockRecorder) DocRef(id interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DocRef", reflect.TypeOf((*MockdatabaseNamespace)(nil).DocRef), id) } +// WideQueryIDs mocks base method +func (m *MockdatabaseNamespace) WideQueryIDs(ctx context.Context, query index.Query, collector chan *ident.IDBatch, opts index.WideQueryOptions) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WideQueryIDs", ctx, query, collector, opts) + ret0, _ := ret[0].(error) + return ret0 +} + +// WideQueryIDs indicates an expected call of WideQueryIDs +func (mr *MockdatabaseNamespaceMockRecorder) WideQueryIDs(ctx, query, collector, opts interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WideQueryIDs", reflect.TypeOf((*MockdatabaseNamespace)(nil).WideQueryIDs), ctx, query, collector, opts) +} + // FetchWideEntry mocks base method -func (m *MockdatabaseNamespace) FetchWideEntry(ctx context.Context, id ident.ID, blockStart time.Time) (block.StreamedWideEntry, error) { +func (m *MockdatabaseNamespace) FetchWideEntry(ctx context.Context, id ident.ID, blockStart time.Time, filter schema.WideEntryFilter) (block.StreamedWideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWideEntry", ctx, id, blockStart) + ret := m.ctrl.Call(m, "FetchWideEntry", ctx, id, blockStart, filter) ret0, _ := ret[0].(block.StreamedWideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchWideEntry indicates an expected call of FetchWideEntry -func (mr *MockdatabaseNamespaceMockRecorder) FetchWideEntry(ctx, id, blockStart interface{}) *gomock.Call { +func (mr *MockdatabaseNamespaceMockRecorder) FetchWideEntry(ctx, id, blockStart, filter interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockdatabaseNamespace)(nil).FetchWideEntry), ctx, id, blockStart) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockdatabaseNamespace)(nil).FetchWideEntry), ctx, id, blockStart, filter) } // Close mocks base method @@ -1487,20 +1544,6 @@ func (mr *MockdatabaseNamespaceMockRecorder) QueryIDs(ctx, query, opts interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryIDs", reflect.TypeOf((*MockdatabaseNamespace)(nil).QueryIDs), ctx, query, opts) } -// WideQueryIDs mocks base method -func (m *MockdatabaseNamespace) WideQueryIDs(ctx context.Context, query index.Query, collector chan *ident.IDBatch, opts index.WideQueryOptions) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "WideQueryIDs", ctx, query, collector, opts) - ret0, _ := ret[0].(error) - return ret0 -} - -// WideQueryIDs indicates an expected call of WideQueryIDs -func (mr *MockdatabaseNamespaceMockRecorder) WideQueryIDs(ctx, query, collector, opts interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WideQueryIDs", reflect.TypeOf((*MockdatabaseNamespace)(nil).WideQueryIDs), ctx, query, collector, opts) -} - // AggregateQuery mocks base method func (m *MockdatabaseNamespace) AggregateQuery(ctx context.Context, query index.Query, opts index.AggregationOptions) (index.AggregateQueryResult, error) { m.ctrl.T.Helper() @@ -2040,18 +2083,18 @@ func (mr *MockdatabaseShardMockRecorder) ReadEncoded(ctx, id, start, end, nsCtx } // FetchWideEntry mocks base method -func (m *MockdatabaseShard) FetchWideEntry(ctx context.Context, id ident.ID, blockStart time.Time, nsCtx namespace.Context) (block.StreamedWideEntry, error) { +func (m *MockdatabaseShard) FetchWideEntry(ctx context.Context, id ident.ID, blockStart time.Time, filter schema.WideEntryFilter, nsCtx namespace.Context) (block.StreamedWideEntry, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FetchWideEntry", ctx, id, blockStart, nsCtx) + ret := m.ctrl.Call(m, "FetchWideEntry", ctx, id, blockStart, filter, nsCtx) ret0, _ := ret[0].(block.StreamedWideEntry) ret1, _ := ret[1].(error) return ret0, ret1 } // FetchWideEntry indicates an expected call of FetchWideEntry -func (mr *MockdatabaseShardMockRecorder) FetchWideEntry(ctx, id, blockStart, nsCtx interface{}) *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) FetchWideEntry(ctx, id, blockStart, filter, nsCtx interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockdatabaseShard)(nil).FetchWideEntry), ctx, id, blockStart, nsCtx) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWideEntry", reflect.TypeOf((*MockdatabaseShard)(nil).FetchWideEntry), ctx, id, blockStart, filter, nsCtx) } // FetchBlocks mocks base method @@ -4755,6 +4798,34 @@ func (mr *MockOptionsMockRecorder) OnColdFlush() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnColdFlush", reflect.TypeOf((*MockOptions)(nil).OnColdFlush)) } +// SetIterationOptions mocks base method +func (m *MockOptions) SetIterationOptions(arg0 index.IterationOptions) Options { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SetIterationOptions", arg0) + ret0, _ := ret[0].(Options) + return ret0 +} + +// SetIterationOptions indicates an expected call of SetIterationOptions +func (mr *MockOptionsMockRecorder) SetIterationOptions(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetIterationOptions", reflect.TypeOf((*MockOptions)(nil).SetIterationOptions), arg0) +} + +// IterationOptions mocks base method +func (m *MockOptions) IterationOptions() index.IterationOptions { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IterationOptions") + ret0, _ := ret[0].(index.IterationOptions) + return ret0 +} + +// IterationOptions indicates an expected call of IterationOptions +func (mr *MockOptionsMockRecorder) IterationOptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IterationOptions", reflect.TypeOf((*MockOptions)(nil).IterationOptions)) +} + // SetForceColdWritesEnabled mocks base method func (m *MockOptions) SetForceColdWritesEnabled(value bool) Options { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index 20705b0abe..81af6d2e37 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -31,6 +31,7 @@ import ( "github.com/m3db/m3/src/dbnode/persist" "github.com/m3db/m3/src/dbnode/persist/fs" "github.com/m3db/m3/src/dbnode/persist/fs/commitlog" + "github.com/m3db/m3/src/dbnode/persist/schema" "github.com/m3db/m3/src/dbnode/runtime" "github.com/m3db/m3/src/dbnode/sharding" "github.com/m3db/m3/src/dbnode/storage/block" @@ -186,6 +187,17 @@ type Database interface { iterOpts index.IterationOptions, ) ([]xio.WideEntry, error) // FIXME: change when exact type known. + // BatchProcessWideQuery runs the given query against the namespace index, + // iterating in a batchwise fashion across all matching IDs, applying the given + // IDBatchProcessor batch processing function to each ID discovered. + BatchProcessWideQuery( + ctx context.Context, + n Namespace, + query index.Query, + batchProcessor IDBatchProcessor, + opts index.WideQueryOptions, + ) error + // FetchBlocks retrieves data blocks for a given id and a list of block // start times. FetchBlocks( @@ -289,12 +301,22 @@ type Namespace interface { // DocRef returns the doc if already present in a namespace shard. DocRef(id ident.ID) (doc.Document, bool, error) - // FetchWideEntry retrieves wide entry for an ID for the + // WideQueryIDs resolves the given query into known IDs in s streaming + // fashion. + WideQueryIDs( + ctx context.Context, + query index.Query, + collector chan *ident.IDBatch, + opts index.WideQueryOptions, + ) error + + // FetchWideEntry retrieves the wide entry for an ID for the // block at time start. FetchWideEntry( ctx context.Context, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, ) (block.StreamedWideEntry, error) } @@ -358,14 +380,6 @@ type databaseNamespace interface { opts index.QueryOptions, ) (index.QueryResult, error) - // WideQueryIDs resolves the given query into known IDs in s streaming fashion. - WideQueryIDs( - ctx context.Context, - query index.Query, - collector chan *ident.IDBatch, - opts index.WideQueryOptions, - ) error - // AggregateQuery resolves the given query into aggregated tags. AggregateQuery( ctx context.Context, @@ -542,6 +556,7 @@ type databaseShard interface { ctx context.Context, id ident.ID, blockStart time.Time, + filter schema.WideEntryFilter, nsCtx namespace.Context, ) (block.StreamedWideEntry, error) @@ -1251,6 +1266,12 @@ type Options interface { // OnColdFlush returns the on cold flush processor. OnColdFlush() OnColdFlush + // SetIterationOptions sets iteration options. + SetIterationOptions(index.IterationOptions) Options + + // IterationOptions returns iteration options. + IterationOptions() index.IterationOptions + // SetForceColdWritesEnabled sets options for forcing cold writes. SetForceColdWritesEnabled(value bool) Options