From 54474fd41779a9e89a77c3f0dfdaeedd7b4123cb Mon Sep 17 00:00:00 2001 From: Gediminas Guoba Date: Mon, 1 Feb 2021 12:03:05 +0200 Subject: [PATCH 1/5] [dtest] endpoint to fetch tagged (#3138) --- .../docker-integration-tests/simple/test.sh | 30 ++++++++----------- .../dtest/docker/harness/resources/dbnode.go | 18 +++++++++++ src/dbnode/integration/client.go | 8 +++++ 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/scripts/docker-integration-tests/simple/test.sh b/scripts/docker-integration-tests/simple/test.sh index d85be4fa92..9c32d50973 100755 --- a/scripts/docker-integration-tests/simple/test.sh +++ b/scripts/docker-integration-tests/simple/test.sh @@ -143,24 +143,18 @@ curl -vvvsS -X POST 0.0.0.0:9003/writetagged -d '{ }' echo "Read data" -queryResult=$(curl -sSf -X POST 0.0.0.0:9003/query -d '{ - "namespace": "unagg", - "query": { - "regexp": { - "field": "city", - "regexp": ".*" - } - }, - "rangeStart": 0, - "rangeEnd":'"$(date +"%s")"' -}' | jq '.results | length') - -if [ "$queryResult" -lt 1 ]; then - echo "Result not found" - exit 1 -else - echo "Result found" -fi +ATTEMPTS=3 TIMEOUT=1 retry_with_backoff \ + '[ "$(curl -sSf -X POST 0.0.0.0:9003/query -d "{ + \"namespace\": \"unagg\", + \"query\": { + \"regexp\": { + \"field\": \"city\", + \"regexp\": \".*\" + } + }, + \"rangeStart\": 0, + \"rangeEnd\":'\"$(date +\"%s\")\"' + }" | jq ".results | length")" == "1" ]' echo "Deleting placement" curl -vvvsSf -X DELETE 0.0.0.0:7201/api/v1/services/m3db/placement diff --git a/src/cmd/tools/dtest/docker/harness/resources/dbnode.go b/src/cmd/tools/dtest/docker/harness/resources/dbnode.go index 2d12305ad7..f9598e5829 100644 --- a/src/cmd/tools/dtest/docker/harness/resources/dbnode.go +++ b/src/cmd/tools/dtest/docker/harness/resources/dbnode.go @@ -101,6 +101,8 @@ type Node interface { AggregateTiles(req *rpc.AggregateTilesRequest) (int64, error) // Fetch fetches datapoints. Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) + // FetchTagged fetches datapoints by tag. + FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) // Exec executes the given commands on the node container, returning // stdout and stderr from the container. Exec(commands ...string) (string, error) @@ -267,6 +269,22 @@ func (c *dbNode) Fetch(req *rpc.FetchRequest) (*rpc.FetchResult_, error) { return dps, nil } +func (c *dbNode) FetchTagged(req *rpc.FetchTaggedRequest) (*rpc.FetchTaggedResult_, error) { + if c.resource.closed { + return nil, errClosed + } + + logger := c.resource.logger.With(zapMethod("fetchtagged")) + result, err := c.tchanClient.TChannelClientFetchTagged(timeout, req) + if err != nil { + logger.Error("could not fetch", zap.Error(err)) + return nil, err + } + + logger.Info("fetched", zap.Int("series_count", len(result.GetElements()))) + return result, nil +} + func (c *dbNode) Restart() error { if c.resource.closed { return errClosed diff --git a/src/dbnode/integration/client.go b/src/dbnode/integration/client.go index 287d367dba..f40b293c5d 100644 --- a/src/dbnode/integration/client.go +++ b/src/dbnode/integration/client.go @@ -129,6 +129,14 @@ func (client *TestTChannelClient) TChannelClientFetch( return client.node.Fetch(ctx, req) } +// TChannelClientFetchTagged fulfills a fetch by tag request using a tchannel client. +func (client *TestTChannelClient) TChannelClientFetchTagged( + timeout time.Duration, req *rpc.FetchTaggedRequest, +) (*rpc.FetchTaggedResult_, error) { + ctx, _ := thrift.NewContext(timeout) + return client.node.FetchTagged(ctx, req) +} + // TChannelClientAggregateTiles runs a request for AggregateTiles. func (client *TestTChannelClient) TChannelClientAggregateTiles( timeout time.Duration, req *rpc.AggregateTilesRequest, From c8ce2658392df6e5bd4760ad5f3065e5cf27d783 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Justas=20=C4=8Cerniauskas?= Date: Mon, 1 Feb 2021 16:59:56 +0200 Subject: [PATCH 2/5] [aggregator] Prevent tcp client panic on nil placement (#3139) --- src/aggregator/client/tcp_client.go | 17 +++++++++++++++- src/aggregator/client/tcp_client_test.go | 25 ++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/src/aggregator/client/tcp_client.go b/src/aggregator/client/tcp_client.go index 38209e20f7..b2484e055e 100644 --- a/src/aggregator/client/tcp_client.go +++ b/src/aggregator/client/tcp_client.go @@ -21,6 +21,7 @@ package client import ( + "errors" "fmt" "math" "time" @@ -39,7 +40,11 @@ import ( xerrors "github.com/m3db/m3/src/x/errors" ) -var _ AdminClient = (*TCPClient)(nil) +var ( + _ AdminClient = (*TCPClient)(nil) + + errNilPlacement = errors.New("placement is nil") +) // TCPClient sends metrics to M3 Aggregator via over custom TCP protocol. type TCPClient struct { @@ -229,6 +234,9 @@ func (c *TCPClient) ActivePlacement() (placement.Placement, int, error) { return nil, 0, err } defer onStagedPlacementDoneFn() + if stagedPlacement == nil { + return nil, 0, errNilPlacement + } placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement() if err != nil { @@ -247,6 +255,9 @@ func (c *TCPClient) ActivePlacementVersion() (int, error) { return 0, err } defer onStagedPlacementDoneFn() + if stagedPlacement == nil { + return 0, errNilPlacement + } return stagedPlacement.Version(), nil } @@ -274,6 +285,10 @@ func (c *TCPClient) write( if err != nil { return err } + if stagedPlacement == nil { + onStagedPlacementDoneFn() + return errNilPlacement + } placement, onPlacementDoneFn, err := stagedPlacement.ActivePlacement() if err != nil { onStagedPlacementDoneFn() diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index a6f4a4af36..3990e7ada3 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -241,6 +241,31 @@ func TestTCPClientWriteUntimedMetricActiveStagedPlacementError(t *testing.T) { } } +func TestTCPClientWriteUntimedMetricActiveStagedPlacementNil(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + watcher := placement.NewMockStagedPlacementWatcher(ctrl) + watcher.EXPECT().ActiveStagedPlacement(). + Return(nil, func() {}, nil). + MinTimes(1) + c := mustNewTestTCPClient(t, testOptions()) + c.placementWatcher = watcher + + for _, input := range []unaggregated.MetricUnion{testCounter, testBatchTimer, testGauge} { + var err error + switch input.Type { + case metric.CounterType: + err = c.WriteUntimedCounter(input.Counter(), testStagedMetadatas) + case metric.TimerType: + err = c.WriteUntimedBatchTimer(input.BatchTimer(), testStagedMetadatas) + case metric.GaugeType: + err = c.WriteUntimedGauge(input.Gauge(), testStagedMetadatas) + } + require.Equal(t, errNilPlacement, err) + } +} + func TestTCPClientWriteUntimedMetricActivePlacementError(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() From 3a42d9e368146ebc38c49902c808b5cee6e4f930 Mon Sep 17 00:00:00 2001 From: arnikola Date: Mon, 1 Feb 2021 13:22:45 -0500 Subject: [PATCH 3/5] [dbnode] Add source propagation to aggregate query (#3153) --- src/dbnode/network/server/tchannelthrift/cluster/service.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/dbnode/network/server/tchannelthrift/cluster/service.go b/src/dbnode/network/server/tchannelthrift/cluster/service.go index 0e2548acdd..0edf833963 100644 --- a/src/dbnode/network/server/tchannelthrift/cluster/service.go +++ b/src/dbnode/network/server/tchannelthrift/cluster/service.go @@ -166,6 +166,7 @@ func (s *service) Query(tctx thrift.Context, req *rpc.QueryRequest) (*rpc.QueryR if err := results.Err(); err != nil { return nil, convert.ToRPCError(err) } + return result, nil } @@ -281,6 +282,10 @@ func (s *service) Aggregate(ctx thrift.Context, req *rpc.AggregateQueryRequest) return nil, tterrors.NewBadRequestError(err) } + if len(req.Source) > 0 { + opts.Source = req.Source + } + iter, metadata, err := session.Aggregate(ns, query, opts) if err != nil { return nil, convert.ToRPCError(err) From 7f1ce15d55c84359b3c70ff74838c40325650dbf Mon Sep 17 00:00:00 2001 From: nate Date: Mon, 1 Feb 2021 13:58:41 -0500 Subject: [PATCH 4/5] [tests] Add option to skip setup for docker integration tests (#3146) --- scripts/docker-integration-tests/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/docker-integration-tests/run.sh b/scripts/docker-integration-tests/run.sh index 12f08e87bf..480a56b825 100755 --- a/scripts/docker-integration-tests/run.sh +++ b/scripts/docker-integration-tests/run.sh @@ -42,7 +42,9 @@ if ! command -v nc && [[ "$BUILDKITE" == "true" ]]; then trap cleanup_nc EXIT fi -scripts/docker-integration-tests/setup.sh +if [[ -z "$SKIP_SETUP" ]] || [[ "$SKIP_SETUP" == "false" ]]; then + scripts/docker-integration-tests/setup.sh +fi NUM_TESTS=${#TESTS[@]} MIN_IDX=$((NUM_TESTS*BUILDKITE_PARALLEL_JOB/BUILDKITE_PARALLEL_JOB_COUNT)) From f23e2d1308d319f7aa044727f89d89e3a8da95ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linas=20Med=C5=BEi=C5=ABnas?= Date: Tue, 2 Feb 2021 09:06:06 +0200 Subject: [PATCH 5/5] [dbnode] Remove unused Shard.ScanData method (#3148) --- src/dbnode/generated/mocks/generate.go | 2 +- src/dbnode/persist/fs/fs_mock.go | 53 +---------------------- src/dbnode/persist/fs/types.go | 8 ---- src/dbnode/storage/shard.go | 60 -------------------------- src/dbnode/storage/shard_test.go | 47 -------------------- src/dbnode/storage/storage_mock.go | 28 ------------ src/dbnode/storage/types.go | 7 --- 7 files changed, 3 insertions(+), 202 deletions(-) diff --git a/src/dbnode/generated/mocks/generate.go b/src/dbnode/generated/mocks/generate.go index 217033516a..de23a9efbf 100644 --- a/src/dbnode/generated/mocks/generate.go +++ b/src/dbnode/generated/mocks/generate.go @@ -20,7 +20,7 @@ // mockgen rules for generating mocks for exported interfaces (reflection mode) -//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter,DataEntryProcessor | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" +//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go" //go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go" //go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest" //go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go" diff --git a/src/dbnode/persist/fs/fs_mock.go b/src/dbnode/persist/fs/fs_mock.go index 7b526c8f6a..e29e80b30a 100644 --- a/src/dbnode/persist/fs/fs_mock.go +++ b/src/dbnode/persist/fs/fs_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter,DataEntryProcessor) +// Source: github.com/m3db/m3/src/dbnode/persist/fs (interfaces: DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter) -// Copyright (c) 2020 Uber Technologies, Inc. +// Copyright (c) 2021 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -1417,52 +1417,3 @@ func (mr *MockStreamingWriterMockRecorder) WriteAll(arg0, arg1, arg2, arg3 inter mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteAll", reflect.TypeOf((*MockStreamingWriter)(nil).WriteAll), arg0, arg1, arg2, arg3) } - -// MockDataEntryProcessor is a mock of DataEntryProcessor interface -type MockDataEntryProcessor struct { - ctrl *gomock.Controller - recorder *MockDataEntryProcessorMockRecorder -} - -// MockDataEntryProcessorMockRecorder is the mock recorder for MockDataEntryProcessor -type MockDataEntryProcessorMockRecorder struct { - mock *MockDataEntryProcessor -} - -// NewMockDataEntryProcessor creates a new mock instance -func NewMockDataEntryProcessor(ctrl *gomock.Controller) *MockDataEntryProcessor { - mock := &MockDataEntryProcessor{ctrl: ctrl} - mock.recorder = &MockDataEntryProcessorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use -func (m *MockDataEntryProcessor) EXPECT() *MockDataEntryProcessorMockRecorder { - return m.recorder -} - -// ProcessEntry mocks base method -func (m *MockDataEntryProcessor) ProcessEntry(arg0 StreamedDataEntry) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ProcessEntry", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// ProcessEntry indicates an expected call of ProcessEntry -func (mr *MockDataEntryProcessorMockRecorder) ProcessEntry(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessEntry", reflect.TypeOf((*MockDataEntryProcessor)(nil).ProcessEntry), arg0) -} - -// SetEntriesCount mocks base method -func (m *MockDataEntryProcessor) SetEntriesCount(arg0 int) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "SetEntriesCount", arg0) -} - -// SetEntriesCount indicates an expected call of SetEntriesCount -func (mr *MockDataEntryProcessorMockRecorder) SetEntriesCount(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetEntriesCount", reflect.TypeOf((*MockDataEntryProcessor)(nil).SetEntriesCount), arg0) -} diff --git a/src/dbnode/persist/fs/types.go b/src/dbnode/persist/fs/types.go index 34879c623f..ca9d9383db 100644 --- a/src/dbnode/persist/fs/types.go +++ b/src/dbnode/persist/fs/types.go @@ -704,11 +704,3 @@ type StreamedMetadataEntry struct { // NewReaderFn creates a new DataFileSetReader. type NewReaderFn func(bytesPool pool.CheckedBytesPool, opts Options) (DataFileSetReader, error) - -// DataEntryProcessor processes StreamedDataEntries. -type DataEntryProcessor interface { - // SetEntriesCount sets the number of entries to be processed. - SetEntriesCount(int) - // ProcessEntry processes a single StreamedDataEntry. - ProcessEntry(StreamedDataEntry) error -} diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index a51bd9c5ce..ea91bebb52 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2839,66 +2839,6 @@ func (s *dbShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReade return reader, nil } -func (s *dbShard) ScanData( - blockStart time.Time, - processor fs.DataEntryProcessor, -) error { - latestVolume, err := s.LatestVolume(blockStart) - if err != nil { - return err - } - - reader, err := s.newReaderFn(s.opts.BytesPool(), s.opts.CommitLogOptions().FilesystemOptions()) - if err != nil { - return err - } - - openOpts := fs.DataReaderOpenOptions{ - Identifier: fs.FileSetFileIdentifier{ - Namespace: s.namespace.ID(), - Shard: s.ID(), - BlockStart: blockStart, - VolumeIndex: latestVolume, - }, - FileSetType: persist.FileSetFlushType, - StreamingEnabled: true, - } - - if err := reader.Open(openOpts); err != nil { - return err - } - - readEntriesErr := s.scanDataWithReader(reader, processor) - // Always close the reader regardless of if failed, but - // make sure to propagate if an error occurred closing the reader too. - readCloseErr := reader.Close() - if err := readEntriesErr; err != nil { - return readEntriesErr - } - return readCloseErr -} - -func (s *dbShard) scanDataWithReader( - reader fs.DataFileSetReader, - processor fs.DataEntryProcessor, -) error { - processor.SetEntriesCount(reader.Entries()) - - for { - entry, err := reader.StreamingRead() - if err != nil { - if errors.Is(err, io.EOF) { - return nil - } - return err - } - - if err := processor.ProcessEntry(entry); err != nil { - return err - } - } -} - func (s *dbShard) logFlushResult(r dbShardFlushResult) { s.logger.Debug("shard flush outcome", zap.Uint32("shard", s.ID()), diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index f957b4f35b..f4abff4b11 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -23,7 +23,6 @@ package storage import ( "errors" "fmt" - "io" "io/ioutil" "os" "strconv" @@ -1991,52 +1990,6 @@ func TestOpenStreamingReader(t *testing.T) { require.NoError(t, err) } -func TestShardScan(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - var ( - blockSize = time.Hour - start = time.Now().Truncate(blockSize) - testOpts = DefaultTestOptions() - ) - - shard := testDatabaseShard(t, testOpts) - defer assert.NoError(t, shard.Close()) - - shardEntries := []fs.StreamedDataEntry{ - { - ID: ident.BytesID("id1"), - EncodedTags: ts.EncodedTags("tags1"), - Data: []byte{1}, - DataChecksum: 11, - }, - { - ID: ident.BytesID("id2"), - EncodedTags: ts.EncodedTags("tags2"), - Data: []byte{2}, - DataChecksum: 22, - }, - } - - processor := fs.NewMockDataEntryProcessor(ctrl) - processor.EXPECT().SetEntriesCount(len(shardEntries)) - - reader, _ := getMockReader(ctrl, t, shard, start, nil) - reader.EXPECT().Entries().Return(len(shardEntries)) - for _, entry := range shardEntries { - reader.EXPECT().StreamingRead().Return(entry, nil) - processor.EXPECT().ProcessEntry(entry) - } - reader.EXPECT().StreamingRead().Return(fs.StreamedDataEntry{}, io.EOF) - - shard.newReaderFn = func(pool.CheckedBytesPool, fs.Options) (fs.DataFileSetReader, error) { - return reader, nil - } - - require.NoError(t, shard.ScanData(start, processor)) -} - func getMockReader( ctrl *gomock.Controller, t *testing.T, diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index ee619f0e00..e4e25101df 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -1933,20 +1933,6 @@ func (mr *MockShardMockRecorder) BootstrapState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapState", reflect.TypeOf((*MockShard)(nil).BootstrapState)) } -// ScanData mocks base method -func (m *MockShard) ScanData(blockStart time.Time, processor fs.DataEntryProcessor) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ScanData", blockStart, processor) - ret0, _ := ret[0].(error) - return ret0 -} - -// ScanData indicates an expected call of ScanData -func (mr *MockShardMockRecorder) ScanData(blockStart, processor interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanData", reflect.TypeOf((*MockShard)(nil).ScanData), blockStart, processor) -} - // OpenStreamingReader mocks base method func (m *MockShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error) { m.ctrl.T.Helper() @@ -2041,20 +2027,6 @@ func (mr *MockdatabaseShardMockRecorder) BootstrapState() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BootstrapState", reflect.TypeOf((*MockdatabaseShard)(nil).BootstrapState)) } -// ScanData mocks base method -func (m *MockdatabaseShard) ScanData(blockStart time.Time, processor fs.DataEntryProcessor) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ScanData", blockStart, processor) - ret0, _ := ret[0].(error) - return ret0 -} - -// ScanData indicates an expected call of ScanData -func (mr *MockdatabaseShardMockRecorder) ScanData(blockStart, processor interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanData", reflect.TypeOf((*MockdatabaseShard)(nil).ScanData), blockStart, processor) -} - // OpenStreamingReader mocks base method func (m *MockdatabaseShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error) { m.ctrl.T.Helper() diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index e39b8169d4..6463eee721 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -509,13 +509,6 @@ type Shard interface { // BootstrapState returns the shards' bootstrap state. BootstrapState() BootstrapState - // ScanData performs a "full table scan" on the given block, - // calling processor function on every entry read. - ScanData( - blockStart time.Time, - processor fs.DataEntryProcessor, - ) error - // OpenStreamingDataReader creates and opens a streaming fs.DataFileSetReader // on the latest volume of the given block. OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error)