Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Remove readers and writer from aggregator API #3122

Merged
merged 15 commits into from
Feb 9, 2021
Merged
47 changes: 4 additions & 43 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1721,6 +1721,7 @@ func (n *dbNamespace) aggregateTiles(
targetBlockStart = opts.Start.Truncate(targetBlockSize)
sourceBlockSize = sourceNs.Options().RetentionOptions().BlockSize()
lastSourceBlockEnd = opts.End.Truncate(sourceBlockSize)
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
)

if targetBlockStart.Add(targetBlockSize).Before(lastSourceBlockEnd) {
Expand All @@ -1732,24 +1733,6 @@ func (n *dbNamespace) aggregateTiles(
return 0, errNamespaceNotBootstrapped
}

var (
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
targetShards = n.OwnedShards()
bytesPool = sourceNs.StorageOptions().BytesPool()
fsOptions = sourceNs.StorageOptions().CommitLogOptions().FilesystemOptions()
blockReaders []fs.DataFileSetReader
sourceBlockStarts []time.Time
)

for sourceBlockStart := targetBlockStart; sourceBlockStart.Before(lastSourceBlockEnd); sourceBlockStart = sourceBlockStart.Add(sourceBlockSize) {
reader, err := fs.NewReader(bytesPool, fsOptions)
if err != nil {
return 0, err
}
sourceBlockStarts = append(sourceBlockStarts, sourceBlockStart)
blockReaders = append(blockReaders, reader)
}

// Cold flusher builds the reverse index for target (current) ns.
onColdFlushNs, err := n.opts.OnColdFlush().ColdFlushNamespace(n)
if err != nil {
Expand All @@ -1764,37 +1747,15 @@ func (n *dbNamespace) aggregateTiles(
if aggregationSuccess {
return
}
// Abort buildling reverse index if aggregation fails.
// Abort building reverse index if aggregation fails.
if err := onColdFlushNs.Abort(); err != nil {
n.log.Error("error aborting cold flush",
zap.Stringer("sourceNs", sourceNs.ID()), zap.Error(err))
}
}()
for _, targetShard := range targetShards {
sourceShard, _, err := sourceNs.ReadableShardAt(targetShard.ID())
if err != nil {
return 0, fmt.Errorf("no matching shard in source namespace %s: %v", sourceNs.ID(), err)
}

sourceBlockVolumes := make([]shardBlockVolume, 0, len(sourceBlockStarts))
for _, sourceBlockStart := range sourceBlockStarts {
latestVolume, err := sourceShard.LatestVolume(sourceBlockStart)
if err != nil {
n.log.Error("error getting shards latest volume",
zap.Error(err), zap.Uint32("shard", sourceShard.ID()), zap.Time("blockStart", sourceBlockStart))
return 0, err
}
sourceBlockVolumes = append(sourceBlockVolumes, shardBlockVolume{sourceBlockStart, latestVolume})
}

writer, err := fs.NewStreamingWriter(n.opts.CommitLogOptions().FilesystemOptions())
if err != nil {
return 0, err
}

for _, targetShard := range n.OwnedShards() {
shardProcessedTileCount, err := targetShard.AggregateTiles(
ctx, sourceNs, n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes,
onColdFlushNs, opts)
ctx, sourceNs, n, targetShard.ID(), onColdFlushNs, opts)

processedTileCount += shardProcessedTileCount
processedShards.Inc(1)
Expand Down
52 changes: 15 additions & 37 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,8 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
start = time.Now().Truncate(time.Hour)
opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)}
insOpts = instrument.NewOptions()
opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour), InsOptions: insOpts}
)

sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions())
Expand All @@ -1445,22 +1446,21 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
}

func TestNamespaceAggregateTiles(t *testing.T) {
ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

var (
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
secondSourceBlockStart = start.Add(sourceBlockSize)
shard0ID uint32 = 10
shard1ID uint32 = 20
insOpts = instrument.NewOptions()
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
shard0ID = uint32(10)
shard1ID = uint32(20)
insOpts = instrument.NewOptions()
)

opts, err := NewAggregateTilesOptions(start, start.Add(targetBlockSize), time.Second, targetNsID, insOpts)
Expand All @@ -1485,42 +1485,20 @@ func TestNamespaceAggregateTiles(t *testing.T) {
mockOnColdFlush.EXPECT().ColdFlushNamespace(gomock.Any()).Return(mockOnColdFlushNs, nil)
targetNs.opts = targetNs.opts.SetOnColdFlush(mockOnColdFlush)

sourceShard0 := NewMockdatabaseShard(ctrl)
sourceShard1 := NewMockdatabaseShard(ctrl)
sourceNs.shards[0] = sourceShard0
sourceNs.shards[1] = sourceShard1

sourceShard0.EXPECT().ID().Return(shard0ID)
sourceShard0.EXPECT().IsBootstrapped().Return(true)
sourceShard0.EXPECT().LatestVolume(start).Return(5, nil)
sourceShard0.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(15, nil)

sourceShard1.EXPECT().ID().Return(shard1ID)
sourceShard1.EXPECT().IsBootstrapped().Return(true)
sourceShard1.EXPECT().LatestVolume(start).Return(7, nil)
sourceShard1.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(17, nil)

targetShard0 := NewMockdatabaseShard(ctrl)
targetShard1 := NewMockdatabaseShard(ctrl)
targetNs.shards[0] = targetShard0
targetNs.shards[1] = targetShard1

targetShard0.EXPECT().ID().Return(uint32(0))
targetShard1.EXPECT().ID().Return(uint32(1))

sourceBlockVolumes0 := []shardBlockVolume{{start, 5}, {secondSourceBlockStart, 15}}
sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}}
targetShard0.EXPECT().ID().Return(shard0ID)
targetShard1.EXPECT().ID().Return(shard1ID)

targetShard0.EXPECT().
AggregateTiles(
ctx, sourceNs, targetNs, shard0ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes0, gomock.Any(), opts).
AggregateTiles(ctx, sourceNs, targetNs, shard0ID, mockOnColdFlushNs, opts).
Return(int64(3), nil)

targetShard1.EXPECT().
AggregateTiles(
ctx, sourceNs, targetNs, shard1ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes1, gomock.Any(), opts).
AggregateTiles(ctx, sourceNs, targetNs, shard1ID, mockOnColdFlushNs, opts).
Return(int64(2), nil)

processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts)
Expand Down
6 changes: 2 additions & 4 deletions src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,8 @@ func (a *noopTileAggregator) AggregateTiles(
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error) {
return 0, nil
) (int64, int, error) {
return 0, 0, nil
}
87 changes: 2 additions & 85 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2669,99 +2669,16 @@ func (s *dbShard) AggregateTiles(
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
sourceBlockVolumes []shardBlockVolume,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error) {
if len(blockReaders) != len(sourceBlockVolumes) {
return 0, fmt.Errorf(
"blockReaders and sourceBlockVolumes length mismatch (%d != %d)",
len(blockReaders),
len(sourceBlockVolumes))
}

openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders))
defer func() {
for _, reader := range openBlockReaders {
if err := reader.Close(); err != nil {
s.logger.Error("could not close DataFileSetReader", zap.Error(err))
}
}
}()

var (
sourceNsID = sourceNs.ID()
plannedSeriesCount = 1
)

for sourceBlockPos, blockReader := range blockReaders {
sourceBlockVolume := sourceBlockVolumes[sourceBlockPos]
openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: sourceNsID,
Shard: shardID,
BlockStart: sourceBlockVolume.blockStart,
VolumeIndex: sourceBlockVolume.latestVolume,
},
FileSetType: persist.FileSetFlushType,
StreamingEnabled: true,
}

if err := blockReader.Open(openOpts); err != nil {
if err == fs.ErrCheckpointFileNotFound {
// A very recent source block might not have been flushed yet.
continue
}
s.logger.Error("blockReader.Open",
zap.Error(err),
zap.Time("blockStart", sourceBlockVolume.blockStart),
zap.Int("volumeIndex", sourceBlockVolume.latestVolume))
return 0, err
}

entries := blockReader.Entries()
if entries > plannedSeriesCount {
plannedSeriesCount = entries
}

openBlockReaders = append(openBlockReaders, blockReader)
}

latestTargetVolume, err := s.LatestVolume(opts.Start)
if err != nil {
return 0, err
}

nextVolume := latestTargetVolume + 1
writerOpenOpts := fs.StreamingWriterOpenOptions{
NamespaceID: s.namespace.ID(),
ShardID: s.ID(),
BlockStart: opts.Start,
BlockSize: s.namespace.Options().RetentionOptions().BlockSize(),
VolumeIndex: nextVolume,
PlannedRecordsCount: uint(plannedSeriesCount),
}
if err = writer.Open(writerOpenOpts); err != nil {
return 0, err
}

var multiErr xerrors.MultiError

processedTileCount, err := s.tileAggregator.AggregateTiles(
ctx, sourceNs, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries, opts)
processedTileCount, nextVolume, err := s.tileAggregator.AggregateTiles(
ctx, sourceNs, targetNs, shardID, onFlushSeries, opts)
if err != nil {
// NB: cannot return on the error here, must finish writing.
multiErr = multiErr.Add(err)
}

if !multiErr.Empty() {
if err := writer.Abort(); err != nil {
multiErr = multiErr.Add(err)
}
} else if err := writer.Close(); err != nil {
multiErr = multiErr.Add(err)
} else {
// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
Expand Down
80 changes: 6 additions & 74 deletions src/dbnode/storage/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,14 +1855,11 @@ func TestShardAggregateTiles(t *testing.T) {
defer ctx.Close()

var (
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
opts = AggregateTilesOptions{Start: start, End: start.Add(targetBlockSize), Step: 10 * time.Minute}

firstSourceBlockEntries = 3
secondSourceBlockEntries = 2
maxSourceBlockEntries = 3
opts = AggregateTilesOptions{
Start: start, End: start.Add(targetBlockSize), Step: 10 * time.Minute,
}

expectedProcessedTileCount = int64(4)

Expand All @@ -1872,90 +1869,25 @@ func TestShardAggregateTiles(t *testing.T) {
aggregator := NewMockTileAggregator(ctrl)
testOpts := DefaultTestOptions().SetTileAggregator(aggregator)

sourceShard := testDatabaseShard(t, testOpts)
defer assert.NoError(t, sourceShard.Close())

reader0, volume0 := getMockReader(
ctrl, t, sourceShard, start, nil)
reader0.EXPECT().Entries().Return(firstSourceBlockEntries)

secondSourceBlockStart := start.Add(sourceBlockSize)
reader1, volume1 := getMockReader(
ctrl, t, sourceShard, secondSourceBlockStart, nil)
reader1.EXPECT().Entries().Return(secondSourceBlockEntries)

thirdSourceBlockStart := secondSourceBlockStart.Add(sourceBlockSize)
reader2, volume2 := getMockReader(
ctrl, t, sourceShard, thirdSourceBlockStart, fs.ErrCheckpointFileNotFound)

blockReaders := []fs.DataFileSetReader{reader0, reader1, reader2}
sourceBlockVolumes := []shardBlockVolume{
{start, volume0},
{secondSourceBlockStart, volume1},
{thirdSourceBlockStart, volume2},
}

targetShard := testDatabaseShardWithIndexFn(t, testOpts, nil, true)
defer assert.NoError(t, targetShard.Close())

writer := fs.NewMockStreamingWriter(ctrl)
gomock.InOrder(
writer.EXPECT().Open(fs.StreamingWriterOpenOptions{
NamespaceID: targetShard.namespace.ID(),
ShardID: targetShard.shard,
BlockStart: opts.Start,
BlockSize: targetBlockSize,
VolumeIndex: 1,
PlannedRecordsCount: uint(maxSourceBlockEntries),
}),
writer.EXPECT().Close(),
)

var (
noOpColdFlushNs = &persist.NoOpColdFlushNamespace{}
sourceNs = NewMockNamespace(ctrl)
targetNs = NewMockNamespace(ctrl)
)

sourceNs.EXPECT().ID().Return(sourceShard.namespace.ID())

aggregator.EXPECT().
AggregateTiles(ctx, sourceNs, targetNs, sourceShard.ID(), gomock.Len(2), writer,
noOpColdFlushNs, opts).
Return(expectedProcessedTileCount, nil)
AggregateTiles(ctx, sourceNs, targetNs, targetShard.ID(), noOpColdFlushNs, opts).
Return(expectedProcessedTileCount, 33, nil)

processedTileCount, err := targetShard.AggregateTiles(
ctx, sourceNs, targetNs, sourceShard.ID(), blockReaders, writer,
sourceBlockVolumes, noOpColdFlushNs, opts)
ctx, sourceNs, targetNs, targetShard.ID(), noOpColdFlushNs, opts)
require.NoError(t, err)
assert.Equal(t, expectedProcessedTileCount, processedTileCount)
}

func TestShardAggregateTilesVerifySliceLengths(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

targetShard := testDatabaseShardWithIndexFn(t, DefaultTestOptions(), nil, true)
defer assert.NoError(t, targetShard.Close())

var (
start = time.Now()
blockReaders []fs.DataFileSetReader
sourceBlockVolumes = []shardBlockVolume{{start, 0}}
writer = fs.NewMockStreamingWriter(ctrl)
sourceNs = NewMockNamespace(ctrl)
targetNs = NewMockNamespace(ctrl)
)

_, err := targetShard.AggregateTiles(
ctx, sourceNs, targetNs, 1, blockReaders, writer, sourceBlockVolumes,
&persist.NoOpColdFlushNamespace{}, AggregateTilesOptions{})
require.EqualError(t, err, "blockReaders and sourceBlockVolumes length mismatch (0 != 1)")
}

func TestOpenStreamingReader(t *testing.T) {
ctrl := xtest.NewController(t)
defer ctrl.Finish()
Expand Down
Loading