diff --git a/src/dbnode/storage/namespace.go b/src/dbnode/storage/namespace.go index 20676e51e5..fd05e2f2d4 100644 --- a/src/dbnode/storage/namespace.go +++ b/src/dbnode/storage/namespace.go @@ -1721,6 +1721,7 @@ func (n *dbNamespace) aggregateTiles( targetBlockStart = opts.Start.Truncate(targetBlockSize) sourceBlockSize = sourceNs.Options().RetentionOptions().BlockSize() lastSourceBlockEnd = opts.End.Truncate(sourceBlockSize) + processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards") ) if targetBlockStart.Add(targetBlockSize).Before(lastSourceBlockEnd) { @@ -1732,24 +1733,6 @@ func (n *dbNamespace) aggregateTiles( return 0, errNamespaceNotBootstrapped } - var ( - processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards") - targetShards = n.OwnedShards() - bytesPool = sourceNs.StorageOptions().BytesPool() - fsOptions = sourceNs.StorageOptions().CommitLogOptions().FilesystemOptions() - blockReaders []fs.DataFileSetReader - sourceBlockStarts []time.Time - ) - - for sourceBlockStart := targetBlockStart; sourceBlockStart.Before(lastSourceBlockEnd); sourceBlockStart = sourceBlockStart.Add(sourceBlockSize) { - reader, err := fs.NewReader(bytesPool, fsOptions) - if err != nil { - return 0, err - } - sourceBlockStarts = append(sourceBlockStarts, sourceBlockStart) - blockReaders = append(blockReaders, reader) - } - // Cold flusher builds the reverse index for target (current) ns. onColdFlushNs, err := n.opts.OnColdFlush().ColdFlushNamespace(n) if err != nil { @@ -1764,37 +1747,15 @@ func (n *dbNamespace) aggregateTiles( if aggregationSuccess { return } - // Abort buildling reverse index if aggregation fails. + // Abort building reverse index if aggregation fails. if err := onColdFlushNs.Abort(); err != nil { n.log.Error("error aborting cold flush", zap.Stringer("sourceNs", sourceNs.ID()), zap.Error(err)) } }() - for _, targetShard := range targetShards { - sourceShard, _, err := sourceNs.ReadableShardAt(targetShard.ID()) - if err != nil { - return 0, fmt.Errorf("no matching shard in source namespace %s: %v", sourceNs.ID(), err) - } - - sourceBlockVolumes := make([]shardBlockVolume, 0, len(sourceBlockStarts)) - for _, sourceBlockStart := range sourceBlockStarts { - latestVolume, err := sourceShard.LatestVolume(sourceBlockStart) - if err != nil { - n.log.Error("error getting shards latest volume", - zap.Error(err), zap.Uint32("shard", sourceShard.ID()), zap.Time("blockStart", sourceBlockStart)) - return 0, err - } - sourceBlockVolumes = append(sourceBlockVolumes, shardBlockVolume{sourceBlockStart, latestVolume}) - } - - writer, err := fs.NewStreamingWriter(n.opts.CommitLogOptions().FilesystemOptions()) - if err != nil { - return 0, err - } - + for _, targetShard := range n.OwnedShards() { shardProcessedTileCount, err := targetShard.AggregateTiles( - ctx, sourceNs, n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes, - onColdFlushNs, opts) + ctx, sourceNs, n, targetShard.ID(), onColdFlushNs, opts) processedTileCount += shardProcessedTileCount processedShards.Inc(1) diff --git a/src/dbnode/storage/namespace_test.go b/src/dbnode/storage/namespace_test.go index 01f2f6d798..a91863aa7e 100644 --- a/src/dbnode/storage/namespace_test.go +++ b/src/dbnode/storage/namespace_test.go @@ -1426,7 +1426,8 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) { sourceNsID = ident.StringID("source") targetNsID = ident.StringID("target") start = time.Now().Truncate(time.Hour) - opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)} + insOpts = instrument.NewOptions() + opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour), InsOptions: insOpts} ) sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions()) @@ -1445,22 +1446,21 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) { } func TestNamespaceAggregateTiles(t *testing.T) { - ctrl := gomock.NewController(t) + ctrl := xtest.NewController(t) defer ctrl.Finish() ctx := context.NewContext() defer ctx.Close() var ( - sourceNsID = ident.StringID("source") - targetNsID = ident.StringID("target") - sourceBlockSize = time.Hour - targetBlockSize = 2 * time.Hour - start = time.Now().Truncate(targetBlockSize) - secondSourceBlockStart = start.Add(sourceBlockSize) - shard0ID uint32 = 10 - shard1ID uint32 = 20 - insOpts = instrument.NewOptions() + sourceNsID = ident.StringID("source") + targetNsID = ident.StringID("target") + sourceBlockSize = time.Hour + targetBlockSize = 2 * time.Hour + start = time.Now().Truncate(targetBlockSize) + shard0ID = uint32(10) + shard1ID = uint32(20) + insOpts = instrument.NewOptions() ) opts, err := NewAggregateTilesOptions(start, start.Add(targetBlockSize), time.Second, targetNsID, insOpts) @@ -1485,42 +1485,20 @@ func TestNamespaceAggregateTiles(t *testing.T) { mockOnColdFlush.EXPECT().ColdFlushNamespace(gomock.Any()).Return(mockOnColdFlushNs, nil) targetNs.opts = targetNs.opts.SetOnColdFlush(mockOnColdFlush) - sourceShard0 := NewMockdatabaseShard(ctrl) - sourceShard1 := NewMockdatabaseShard(ctrl) - sourceNs.shards[0] = sourceShard0 - sourceNs.shards[1] = sourceShard1 - - sourceShard0.EXPECT().ID().Return(shard0ID) - sourceShard0.EXPECT().IsBootstrapped().Return(true) - sourceShard0.EXPECT().LatestVolume(start).Return(5, nil) - sourceShard0.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(15, nil) - - sourceShard1.EXPECT().ID().Return(shard1ID) - sourceShard1.EXPECT().IsBootstrapped().Return(true) - sourceShard1.EXPECT().LatestVolume(start).Return(7, nil) - sourceShard1.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(17, nil) - targetShard0 := NewMockdatabaseShard(ctrl) targetShard1 := NewMockdatabaseShard(ctrl) targetNs.shards[0] = targetShard0 targetNs.shards[1] = targetShard1 - targetShard0.EXPECT().ID().Return(uint32(0)) - targetShard1.EXPECT().ID().Return(uint32(1)) - - sourceBlockVolumes0 := []shardBlockVolume{{start, 5}, {secondSourceBlockStart, 15}} - sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}} + targetShard0.EXPECT().ID().Return(shard0ID) + targetShard1.EXPECT().ID().Return(shard1ID) targetShard0.EXPECT(). - AggregateTiles( - ctx, sourceNs, targetNs, shard0ID, gomock.Len(2), gomock.Any(), - sourceBlockVolumes0, gomock.Any(), opts). + AggregateTiles(ctx, sourceNs, targetNs, shard0ID, mockOnColdFlushNs, opts). Return(int64(3), nil) targetShard1.EXPECT(). - AggregateTiles( - ctx, sourceNs, targetNs, shard1ID, gomock.Len(2), gomock.Any(), - sourceBlockVolumes1, gomock.Any(), opts). + AggregateTiles(ctx, sourceNs, targetNs, shard1ID, mockOnColdFlushNs, opts). Return(int64(2), nil) processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts) diff --git a/src/dbnode/storage/options.go b/src/dbnode/storage/options.go index 61107ce494..ffb51f3163 100644 --- a/src/dbnode/storage/options.go +++ b/src/dbnode/storage/options.go @@ -946,10 +946,8 @@ func (a *noopTileAggregator) AggregateTiles( ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, - blockReaders []fs.DataFileSetReader, - writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions, -) (int64, error) { - return 0, nil +) (int64, int, error) { + return 0, 0, nil } diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 8c5ebbe5e6..e39e7fc19d 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2669,99 +2669,16 @@ func (s *dbShard) AggregateTiles( ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, - blockReaders []fs.DataFileSetReader, - writer fs.StreamingWriter, - sourceBlockVolumes []shardBlockVolume, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions, ) (int64, error) { - if len(blockReaders) != len(sourceBlockVolumes) { - return 0, fmt.Errorf( - "blockReaders and sourceBlockVolumes length mismatch (%d != %d)", - len(blockReaders), - len(sourceBlockVolumes)) - } - - openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders)) - defer func() { - for _, reader := range openBlockReaders { - if err := reader.Close(); err != nil { - s.logger.Error("could not close DataFileSetReader", zap.Error(err)) - } - } - }() - - var ( - sourceNsID = sourceNs.ID() - plannedSeriesCount = 1 - ) - - for sourceBlockPos, blockReader := range blockReaders { - sourceBlockVolume := sourceBlockVolumes[sourceBlockPos] - openOpts := fs.DataReaderOpenOptions{ - Identifier: fs.FileSetFileIdentifier{ - Namespace: sourceNsID, - Shard: shardID, - BlockStart: sourceBlockVolume.blockStart, - VolumeIndex: sourceBlockVolume.latestVolume, - }, - FileSetType: persist.FileSetFlushType, - StreamingEnabled: true, - } - - if err := blockReader.Open(openOpts); err != nil { - if err == fs.ErrCheckpointFileNotFound { - // A very recent source block might not have been flushed yet. - continue - } - s.logger.Error("blockReader.Open", - zap.Error(err), - zap.Time("blockStart", sourceBlockVolume.blockStart), - zap.Int("volumeIndex", sourceBlockVolume.latestVolume)) - return 0, err - } - - entries := blockReader.Entries() - if entries > plannedSeriesCount { - plannedSeriesCount = entries - } - - openBlockReaders = append(openBlockReaders, blockReader) - } - - latestTargetVolume, err := s.LatestVolume(opts.Start) - if err != nil { - return 0, err - } - - nextVolume := latestTargetVolume + 1 - writerOpenOpts := fs.StreamingWriterOpenOptions{ - NamespaceID: s.namespace.ID(), - ShardID: s.ID(), - BlockStart: opts.Start, - BlockSize: s.namespace.Options().RetentionOptions().BlockSize(), - VolumeIndex: nextVolume, - PlannedRecordsCount: uint(plannedSeriesCount), - } - if err = writer.Open(writerOpenOpts); err != nil { - return 0, err - } - var multiErr xerrors.MultiError - processedTileCount, err := s.tileAggregator.AggregateTiles( - ctx, sourceNs, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries, opts) + processedTileCount, nextVolume, err := s.tileAggregator.AggregateTiles( + ctx, sourceNs, targetNs, shardID, onFlushSeries, opts) if err != nil { // NB: cannot return on the error here, must finish writing. multiErr = multiErr.Add(err) - } - - if !multiErr.Empty() { - if err := writer.Abort(); err != nil { - multiErr = multiErr.Add(err) - } - } else if err := writer.Close(); err != nil { - multiErr = multiErr.Add(err) } else { // Notify all block leasers that a new volume for the namespace/shard/blockstart // has been created. This will block until all leasers have relinquished their diff --git a/src/dbnode/storage/shard_test.go b/src/dbnode/storage/shard_test.go index 0b1533e270..b8158ac879 100644 --- a/src/dbnode/storage/shard_test.go +++ b/src/dbnode/storage/shard_test.go @@ -1855,14 +1855,11 @@ func TestShardAggregateTiles(t *testing.T) { defer ctx.Close() var ( - sourceBlockSize = time.Hour targetBlockSize = 2 * time.Hour start = time.Now().Truncate(targetBlockSize) - opts = AggregateTilesOptions{Start: start, End: start.Add(targetBlockSize), Step: 10 * time.Minute} - - firstSourceBlockEntries = 3 - secondSourceBlockEntries = 2 - maxSourceBlockEntries = 3 + opts = AggregateTilesOptions{ + Start: start, End: start.Add(targetBlockSize), Step: 10 * time.Minute, + } expectedProcessedTileCount = int64(4) @@ -1872,90 +1869,25 @@ func TestShardAggregateTiles(t *testing.T) { aggregator := NewMockTileAggregator(ctrl) testOpts := DefaultTestOptions().SetTileAggregator(aggregator) - sourceShard := testDatabaseShard(t, testOpts) - defer assert.NoError(t, sourceShard.Close()) - - reader0, volume0 := getMockReader( - ctrl, t, sourceShard, start, nil) - reader0.EXPECT().Entries().Return(firstSourceBlockEntries) - - secondSourceBlockStart := start.Add(sourceBlockSize) - reader1, volume1 := getMockReader( - ctrl, t, sourceShard, secondSourceBlockStart, nil) - reader1.EXPECT().Entries().Return(secondSourceBlockEntries) - - thirdSourceBlockStart := secondSourceBlockStart.Add(sourceBlockSize) - reader2, volume2 := getMockReader( - ctrl, t, sourceShard, thirdSourceBlockStart, fs.ErrCheckpointFileNotFound) - - blockReaders := []fs.DataFileSetReader{reader0, reader1, reader2} - sourceBlockVolumes := []shardBlockVolume{ - {start, volume0}, - {secondSourceBlockStart, volume1}, - {thirdSourceBlockStart, volume2}, - } - targetShard := testDatabaseShardWithIndexFn(t, testOpts, nil, true) defer assert.NoError(t, targetShard.Close()) - writer := fs.NewMockStreamingWriter(ctrl) - gomock.InOrder( - writer.EXPECT().Open(fs.StreamingWriterOpenOptions{ - NamespaceID: targetShard.namespace.ID(), - ShardID: targetShard.shard, - BlockStart: opts.Start, - BlockSize: targetBlockSize, - VolumeIndex: 1, - PlannedRecordsCount: uint(maxSourceBlockEntries), - }), - writer.EXPECT().Close(), - ) - var ( noOpColdFlushNs = &persist.NoOpColdFlushNamespace{} sourceNs = NewMockNamespace(ctrl) targetNs = NewMockNamespace(ctrl) ) - sourceNs.EXPECT().ID().Return(sourceShard.namespace.ID()) - aggregator.EXPECT(). - AggregateTiles(ctx, sourceNs, targetNs, sourceShard.ID(), gomock.Len(2), writer, - noOpColdFlushNs, opts). - Return(expectedProcessedTileCount, nil) + AggregateTiles(ctx, sourceNs, targetNs, targetShard.ID(), noOpColdFlushNs, opts). + Return(expectedProcessedTileCount, 33, nil) processedTileCount, err := targetShard.AggregateTiles( - ctx, sourceNs, targetNs, sourceShard.ID(), blockReaders, writer, - sourceBlockVolumes, noOpColdFlushNs, opts) + ctx, sourceNs, targetNs, targetShard.ID(), noOpColdFlushNs, opts) require.NoError(t, err) assert.Equal(t, expectedProcessedTileCount, processedTileCount) } -func TestShardAggregateTilesVerifySliceLengths(t *testing.T) { - ctrl := xtest.NewController(t) - defer ctrl.Finish() - - ctx := context.NewContext() - defer ctx.Close() - - targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true) - defer assert.NoError(t, targetShard.Close()) - - var ( - start = time.Now() - blockReaders []fs.DataFileSetReader - sourceBlockVolumes = []shardBlockVolume{{start, 0}} - writer = fs.NewMockStreamingWriter(ctrl) - sourceNs = NewMockNamespace(ctrl) - targetNs = NewMockNamespace(ctrl) - ) - - _, err := targetShard.AggregateTiles( - ctx, sourceNs, targetNs, 1, blockReaders, writer, sourceBlockVolumes, - &persist.NoOpColdFlushNamespace{}, AggregateTilesOptions{}) - require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)") -} - func TestOpenStreamingReader(t *testing.T) { ctrl := xtest.NewController(t) defer ctrl.Finish() diff --git a/src/dbnode/storage/storage_mock.go b/src/dbnode/storage/storage_mock.go index e9bbb5c7c7..2a6252b9cf 100644 --- a/src/dbnode/storage/storage_mock.go +++ b/src/dbnode/storage/storage_mock.go @@ -2377,18 +2377,18 @@ func (mr *MockdatabaseShardMockRecorder) DocRef(id interface{}) *gomock.Call { } // AggregateTiles mocks base method -func (m *MockdatabaseShard) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, blockReaders []fs.DataFileSetReader, writer fs.StreamingWriter, sourceBlockVolumes []shardBlockVolume, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { +func (m *MockdatabaseShard) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, onFlushSeries, opts) ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockdatabaseShardMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts interface{}) *gomock.Call { +func (mr *MockdatabaseShardMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, onFlushSeries, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, blockReaders, writer, sourceBlockVolumes, onFlushSeries, opts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockdatabaseShard)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, onFlushSeries, opts) } // LatestVolume mocks base method @@ -5335,18 +5335,19 @@ func (m *MockTileAggregator) EXPECT() *MockTileAggregatorMockRecorder { } // AggregateTiles mocks base method -func (m *MockTileAggregator) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, readers []fs.DataFileSetReader, writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, error) { +func (m *MockTileAggregator) AggregateTiles(ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions) (int64, int, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts) + ret := m.ctrl.Call(m, "AggregateTiles", ctx, sourceNs, targetNs, shardID, onFlushSeries, opts) ret0, _ := ret[0].(int64) - ret1, _ := ret[1].(error) - return ret0, ret1 + ret1, _ := ret[1].(int) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 } // AggregateTiles indicates an expected call of AggregateTiles -func (mr *MockTileAggregatorMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts interface{}) *gomock.Call { +func (mr *MockTileAggregatorMockRecorder) AggregateTiles(ctx, sourceNs, targetNs, shardID, onFlushSeries, opts interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTileAggregator)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, readers, writer, onFlushSeries, opts) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AggregateTiles", reflect.TypeOf((*MockTileAggregator)(nil).AggregateTiles), ctx, sourceNs, targetNs, shardID, onFlushSeries, opts) } // MockNamespaceHooks is a mock of NamespaceHooks interface diff --git a/src/dbnode/storage/types.go b/src/dbnode/storage/types.go index f1a8ad8cac..3609710a7e 100644 --- a/src/dbnode/storage/types.go +++ b/src/dbnode/storage/types.go @@ -674,9 +674,6 @@ type databaseShard interface { sourceNs Namespace, targetNs Namespace, shardID uint32, - blockReaders []fs.DataFileSetReader, - writer fs.StreamingWriter, - sourceBlockVolumes []shardBlockVolume, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions, ) (int64, error) @@ -1430,11 +1427,9 @@ type TileAggregator interface { ctx context.Context, sourceNs, targetNs Namespace, shardID uint32, - readers []fs.DataFileSetReader, - writer fs.StreamingWriter, onFlushSeries persist.OnFlushSeries, opts AggregateTilesOptions, - ) (int64, error) + ) (processedTileCount int64, nextVolume int, err error) } // NewTileAggregatorFn creates a new TileAggregator.