Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Remove readers and writer from aggregator API #3122

Merged
merged 15 commits into from
Feb 9, 2021
Merged
45 changes: 3 additions & 42 deletions src/dbnode/storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,7 @@ func (n *dbNamespace) aggregateTiles(
targetBlockStart = opts.Start.Truncate(targetBlockSize)
sourceBlockSize = sourceNs.Options().RetentionOptions().BlockSize()
lastSourceBlockEnd = opts.End.Truncate(sourceBlockSize)
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
)

if targetBlockStart.Add(targetBlockSize).Before(lastSourceBlockEnd) {
Expand All @@ -1733,55 +1734,15 @@ func (n *dbNamespace) aggregateTiles(
return 0, errNamespaceNotBootstrapped
}

var (
processedShards = opts.InsOptions.MetricsScope().Counter("processed-shards")
targetShards = n.OwnedShards()
bytesPool = sourceNs.StorageOptions().BytesPool()
fsOptions = sourceNs.StorageOptions().CommitLogOptions().FilesystemOptions()
blockReaders []fs.DataFileSetReader
sourceBlockStarts []time.Time
)

for sourceBlockStart := targetBlockStart; sourceBlockStart.Before(lastSourceBlockEnd); sourceBlockStart = sourceBlockStart.Add(sourceBlockSize) {
reader, err := fs.NewReader(bytesPool, fsOptions)
if err != nil {
return 0, err
}
sourceBlockStarts = append(sourceBlockStarts, sourceBlockStart)
blockReaders = append(blockReaders, reader)
}

onColdFlushNs, err := n.opts.OnColdFlush().ColdFlushNamespace(n)
if err != nil {
return 0, err
}

var processedTileCount int64
for _, targetShard := range targetShards {
sourceShard, _, err := sourceNs.ReadableShardAt(targetShard.ID())
if err != nil {
return 0, fmt.Errorf("no matching shard in source namespace %s: %v", sourceNs.ID(), err)
}

sourceBlockVolumes := make([]shardBlockVolume, 0, len(sourceBlockStarts))
for _, sourceBlockStart := range sourceBlockStarts {
latestVolume, err := sourceShard.LatestVolume(sourceBlockStart)
if err != nil {
n.log.Error("error getting shards latest volume",
zap.Error(err), zap.Uint32("shard", sourceShard.ID()), zap.Time("blockStart", sourceBlockStart))
return 0, err
}
sourceBlockVolumes = append(sourceBlockVolumes, shardBlockVolume{sourceBlockStart, latestVolume})
}

writer, err := fs.NewStreamingWriter(n.opts.CommitLogOptions().FilesystemOptions())
if err != nil {
return 0, err
}

for _, targetShard := range n.OwnedShards() {
shardProcessedTileCount, err := targetShard.AggregateTiles(
ctx, sourceNs, n, sourceShard.ID(), blockReaders, writer, sourceBlockVolumes,
onColdFlushNs, opts)
ctx, sourceNs, n, targetShard.ID(), onColdFlushNs, opts)

processedTileCount += shardProcessedTileCount
processedShards.Inc(1)
Expand Down
52 changes: 15 additions & 37 deletions src/dbnode/storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1426,7 +1426,8 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
start = time.Now().Truncate(time.Hour)
opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour)}
insOpts = instrument.NewOptions()
opts = AggregateTilesOptions{Start: start, End: start.Add(time.Hour), InsOptions: insOpts}
)

sourceNs, sourceCloser := newTestNamespaceWithIDOpts(t, sourceNsID, namespace.NewOptions())
Expand All @@ -1445,22 +1446,21 @@ func TestNamespaceAggregateTilesFailUntilBootstrapped(t *testing.T) {
}

func TestNamespaceAggregateTiles(t *testing.T) {
ctrl := gomock.NewController(t)
ctrl := xtest.NewController(t)
defer ctrl.Finish()

ctx := context.NewContext()
defer ctx.Close()

var (
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
secondSourceBlockStart = start.Add(sourceBlockSize)
shard0ID uint32 = 10
shard1ID uint32 = 20
insOpts = instrument.NewOptions()
sourceNsID = ident.StringID("source")
targetNsID = ident.StringID("target")
sourceBlockSize = time.Hour
targetBlockSize = 2 * time.Hour
start = time.Now().Truncate(targetBlockSize)
shard0ID = uint32(10)
shard1ID = uint32(20)
insOpts = instrument.NewOptions()
)

opts, err := NewAggregateTilesOptions(start, start.Add(targetBlockSize), time.Second, targetNsID, insOpts)
Expand All @@ -1485,42 +1485,20 @@ func TestNamespaceAggregateTiles(t *testing.T) {
mockOnColdFlush.EXPECT().ColdFlushNamespace(gomock.Any()).Return(mockOnColdFlushNs, nil)
targetNs.opts = targetNs.opts.SetOnColdFlush(mockOnColdFlush)

sourceShard0 := NewMockdatabaseShard(ctrl)
sourceShard1 := NewMockdatabaseShard(ctrl)
sourceNs.shards[0] = sourceShard0
sourceNs.shards[1] = sourceShard1

sourceShard0.EXPECT().ID().Return(shard0ID)
sourceShard0.EXPECT().IsBootstrapped().Return(true)
sourceShard0.EXPECT().LatestVolume(start).Return(5, nil)
sourceShard0.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(15, nil)

sourceShard1.EXPECT().ID().Return(shard1ID)
sourceShard1.EXPECT().IsBootstrapped().Return(true)
sourceShard1.EXPECT().LatestVolume(start).Return(7, nil)
sourceShard1.EXPECT().LatestVolume(start.Add(sourceBlockSize)).Return(17, nil)

targetShard0 := NewMockdatabaseShard(ctrl)
targetShard1 := NewMockdatabaseShard(ctrl)
targetNs.shards[0] = targetShard0
targetNs.shards[1] = targetShard1

targetShard0.EXPECT().ID().Return(uint32(0))
targetShard1.EXPECT().ID().Return(uint32(1))

sourceBlockVolumes0 := []shardBlockVolume{{start, 5}, {secondSourceBlockStart, 15}}
sourceBlockVolumes1 := []shardBlockVolume{{start, 7}, {secondSourceBlockStart, 17}}
targetShard0.EXPECT().ID().Return(shard0ID)
targetShard1.EXPECT().ID().Return(shard1ID)

targetShard0.EXPECT().
AggregateTiles(
ctx, sourceNs, targetNs, shard0ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes0, gomock.Any(), opts).
AggregateTiles(ctx, sourceNs, targetNs, shard0ID, mockOnColdFlushNs, opts).
Return(int64(3), nil)

targetShard1.EXPECT().
AggregateTiles(
ctx, sourceNs, targetNs, shard1ID, gomock.Len(2), gomock.Any(),
sourceBlockVolumes1, gomock.Any(), opts).
AggregateTiles(ctx, sourceNs, targetNs, shard1ID, mockOnColdFlushNs, opts).
Return(int64(2), nil)

processedTileCount, err := targetNs.AggregateTiles(ctx, sourceNs, opts)
Expand Down
6 changes: 2 additions & 4 deletions src/dbnode/storage/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,10 +946,8 @@ func (a *noopTileAggregator) AggregateTiles(
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error) {
return 0, nil
) (int64, int, error) {
return 0, 0, nil
}
116 changes: 32 additions & 84 deletions src/dbnode/storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -2669,100 +2669,19 @@ func (s *dbShard) AggregateTiles(
ctx context.Context,
sourceNs, targetNs Namespace,
shardID uint32,
blockReaders []fs.DataFileSetReader,
writer fs.StreamingWriter,
sourceBlockVolumes []shardBlockVolume,
onFlushSeries persist.OnFlushSeries,
opts AggregateTilesOptions,
) (int64, error) {
if len(blockReaders) != len(sourceBlockVolumes) {
return 0, fmt.Errorf(
"blockReaders and sourceBlockVolumes length mismatch (%d != %d)",
len(blockReaders),
len(sourceBlockVolumes))
}

openBlockReaders := make([]fs.DataFileSetReader, 0, len(blockReaders))
defer func() {
for _, reader := range openBlockReaders {
if err := reader.Close(); err != nil {
s.logger.Error("could not close DataFileSetReader", zap.Error(err))
}
}
}()

var (
sourceNsID = sourceNs.ID()
plannedSeriesCount = 1
)

for sourceBlockPos, blockReader := range blockReaders {
sourceBlockVolume := sourceBlockVolumes[sourceBlockPos]
openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: sourceNsID,
Shard: shardID,
BlockStart: sourceBlockVolume.blockStart,
VolumeIndex: sourceBlockVolume.latestVolume,
},
FileSetType: persist.FileSetFlushType,
StreamingEnabled: true,
}

if err := blockReader.Open(openOpts); err != nil {
if err == fs.ErrCheckpointFileNotFound {
// A very recent source block might not have been flushed yet.
continue
}
s.logger.Error("blockReader.Open",
zap.Error(err),
zap.Time("blockStart", sourceBlockVolume.blockStart),
zap.Int("volumeIndex", sourceBlockVolume.latestVolume))
return 0, err
}

entries := blockReader.Entries()
if entries > plannedSeriesCount {
plannedSeriesCount = entries
}

openBlockReaders = append(openBlockReaders, blockReader)
}

latestTargetVolume, err := s.LatestVolume(opts.Start)
if err != nil {
return 0, err
}

nextVolume := latestTargetVolume + 1
writerOpenOpts := fs.StreamingWriterOpenOptions{
NamespaceID: s.namespace.ID(),
ShardID: s.ID(),
BlockStart: opts.Start,
BlockSize: s.namespace.Options().RetentionOptions().BlockSize(),
VolumeIndex: nextVolume,
PlannedRecordsCount: uint(plannedSeriesCount),
}
if err = writer.Open(writerOpenOpts); err != nil {
return 0, err
}

var multiErr xerrors.MultiError

processedTileCount, err := s.tileAggregator.AggregateTiles(
ctx, sourceNs, targetNs, s.ID(), openBlockReaders, writer, onFlushSeries, opts)
processedTileCount, nextVolume, err := s.tileAggregator.AggregateTiles(
ctx, sourceNs, targetNs, shardID, onFlushSeries, opts)
if err != nil {
// NB: cannot return on the error here, must finish writing.
multiErr = multiErr.Add(err)
}

if !multiErr.Empty() {
if err := writer.Abort(); err != nil {
multiErr = multiErr.Add(err)
}
} else if err := writer.Close(); err != nil {
multiErr = multiErr.Add(err)
} else {
if multiErr.Empty() {
linasm marked this conversation as resolved.
Show resolved Hide resolved
// Notify all block leasers that a new volume for the namespace/shard/blockstart
// has been created. This will block until all leasers have relinquished their
// leases.
Expand Down Expand Up @@ -2810,6 +2729,35 @@ func (s *dbShard) LatestVolume(blockStart time.Time) (int, error) {
return s.namespaceReaderMgr.latestVolume(s.shard, blockStart)
}

func (s *dbShard) OpenStreamingReader(blockStart time.Time) (fs.DataFileSetReader, error) {
latestVolume, err := s.LatestVolume(blockStart)
if err != nil {
return nil, err
}

reader, err := s.newReaderFn(s.opts.BytesPool(), s.opts.CommitLogOptions().FilesystemOptions())
if err != nil {
return nil, err
}

openOpts := fs.DataReaderOpenOptions{
Identifier: fs.FileSetFileIdentifier{
Namespace: s.namespace.ID(),
Shard: s.ID(),
BlockStart: blockStart,
VolumeIndex: latestVolume,
},
FileSetType: persist.FileSetFlushType,
StreamingEnabled: true,
}

if err := reader.Open(openOpts); err != nil {
return nil, err
}

return reader, nil
}

func (s *dbShard) ScanData(
blockStart time.Time,
processor fs.DataEntryProcessor,
Expand Down
Loading