From 24dd26259c049c30cafe3f0bc93642cd96c8f61d Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 1 Dec 2020 11:51:41 +0200 Subject: [PATCH 1/3] [dbnode] Prevent potential division by 0 in StreamingWriter --- src/dbnode/persist/fs/streaming_write.go | 10 ++++-- src/dbnode/persist/fs/streaming_write_test.go | 33 +++++++++++++++---- src/dbnode/storage/shard.go | 10 +++--- 3 files changed, 39 insertions(+), 14 deletions(-) diff --git a/src/dbnode/persist/fs/streaming_write.go b/src/dbnode/persist/fs/streaming_write.go index 010021a464..3e4dc0bdc8 100644 --- a/src/dbnode/persist/fs/streaming_write.go +++ b/src/dbnode/persist/fs/streaming_write.go @@ -83,6 +83,11 @@ func NewStreamingWriter(opts Options) (StreamingWriter, error) { } func (w *streamingWriter) Open(opts StreamingWriterOpenOptions) error { + if opts.PlannedRecordsCount <= 0 { + return fmt.Errorf( + "PlannedRecordsCount must be positive, got %d", opts.PlannedRecordsCount) + } + writerOpts := DataWriterOpenOptions{ BlockSize: opts.BlockSize, Identifier: FileSetFileIdentifier{ @@ -105,9 +110,10 @@ func (w *streamingWriter) Open(opts StreamingWriterOpenOptions) error { w.bloomFilter = bloom.NewBloomFilter(m, k) summariesApprox := float64(opts.PlannedRecordsCount) * w.options.IndexSummariesPercent() - w.summaryEvery = 0 + w.summaryEvery = 1 if summariesApprox > 0 { - w.summaryEvery = int64(math.Floor(float64(opts.PlannedRecordsCount) / summariesApprox)) + w.summaryEvery = int64(math.Max(1, + math.Floor(float64(opts.PlannedRecordsCount)/summariesApprox))) } if err := w.writer.Open(writerOpts); err != nil { diff --git a/src/dbnode/persist/fs/streaming_write_test.go b/src/dbnode/persist/fs/streaming_write_test.go index bb43793853..06a8ce1fc7 100644 --- a/src/dbnode/persist/fs/streaming_write_test.go +++ b/src/dbnode/persist/fs/streaming_write_test.go @@ -60,14 +60,14 @@ func newTestStreamingWriter( require.NoError(t, err) writerOpenOpts := StreamingWriterOpenOptions{ - NamespaceID: testNs1ID, - ShardID: shard, - BlockStart: timestamp, - BlockSize: testBlockSize, + NamespaceID: testNs1ID, + ShardID: shard, + BlockStart: timestamp, + BlockSize: testBlockSize, - VolumeIndex: nextVersion, - PlannedRecordsCount: plannedEntries, - } + VolumeIndex: nextVersion, + PlannedRecordsCount: plannedEntries, + } err = writer.Open(writerOpenOpts) require.NoError(t, err) @@ -184,6 +184,25 @@ func TestReadStreamingWriteEmptyFileset(t *testing.T) { readTestData(t, r, 0, testWriterStart, nil) } +func TestReadStreamingWriteReject0PlannedRecordsCount(t *testing.T) { + dir := createTempDir(t) + filePathPrefix := filepath.Join(dir, "") + defer os.RemoveAll(dir) // nolint: errcheck + + writer, err := NewStreamingWriter(testDefaultOpts. + SetFilePathPrefix(filePathPrefix). + SetWriterBufferSize(testWriterBufferSize)) + require.NoError(t, err) + + writerOpenOpts := StreamingWriterOpenOptions{ + NamespaceID: testNs1ID, + BlockSize: testBlockSize, + PlannedRecordsCount: 0, + } + err = writer.Open(writerOpenOpts) + require.EqualError(t, err, "PlannedRecordsCount must be positive, got 0") +} + func TestStreamingWriterAbort(t *testing.T) { dir := createTempDir(t) filePathPrefix := filepath.Join(dir, "") diff --git a/src/dbnode/storage/shard.go b/src/dbnode/storage/shard.go index 56d61d776d..f8e39f7084 100644 --- a/src/dbnode/storage/shard.go +++ b/src/dbnode/storage/shard.go @@ -2688,8 +2688,8 @@ func (s *dbShard) AggregateTiles( }() var ( - sourceNsID = sourceNs.ID() - maxEntries = 0 + sourceNsID = sourceNs.ID() + plannedSeriesCount = 1 ) for sourceBlockPos, blockReader := range blockReaders { @@ -2718,8 +2718,8 @@ func (s *dbShard) AggregateTiles( } entries := blockReader.Entries() - if entries > maxEntries { - maxEntries = entries + if entries > plannedSeriesCount { + plannedSeriesCount = entries } openBlockReaders = append(openBlockReaders, blockReader) @@ -2737,7 +2737,7 @@ func (s *dbShard) AggregateTiles( BlockStart: opts.Start, BlockSize: s.namespace.Options().RetentionOptions().BlockSize(), VolumeIndex: nextVolume, - PlannedRecordsCount: uint(maxEntries), + PlannedRecordsCount: uint(plannedSeriesCount), } if err = writer.Open(writerOpenOpts); err != nil { return 0, err From 651cc8f1ad9ac063d035763dd2570db7da4f3392 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Tue, 1 Dec 2020 13:37:30 +0200 Subject: [PATCH 2/3] Fix tests --- src/dbnode/persist/fs/streaming_write_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/dbnode/persist/fs/streaming_write_test.go b/src/dbnode/persist/fs/streaming_write_test.go index 06a8ce1fc7..6482f2a2d2 100644 --- a/src/dbnode/persist/fs/streaming_write_test.go +++ b/src/dbnode/persist/fs/streaming_write_test.go @@ -174,7 +174,7 @@ func TestReadStreamingWriteEmptyFileset(t *testing.T) { filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 0) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 1) err := streamingWriteTestData(t, w, testWriterStart, nil) require.NoError(t, err) err = w.Close() @@ -208,7 +208,7 @@ func TestStreamingWriterAbort(t *testing.T) { filePathPrefix := filepath.Join(dir, "") defer os.RemoveAll(dir) - w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 0) + w := newTestStreamingWriter(t, filePathPrefix, 0, testWriterStart, 0, 1) err := streamingWriteTestData(t, w, testWriterStart, nil) require.NoError(t, err) err = w.Abort() From 007e6a5334a81f2e6fa2bade907f94412b256946 Mon Sep 17 00:00:00 2001 From: Linas Medziunas Date: Wed, 2 Dec 2020 15:34:07 +0200 Subject: [PATCH 3/3] Additional hardening against division by 0 --- src/dbnode/persist/fs/streaming_write.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/dbnode/persist/fs/streaming_write.go b/src/dbnode/persist/fs/streaming_write.go index 3e4dc0bdc8..4120ff4711 100644 --- a/src/dbnode/persist/fs/streaming_write.go +++ b/src/dbnode/persist/fs/streaming_write.go @@ -52,11 +52,14 @@ type StreamingWriter interface { // StreamingWriterOpenOptions in the options for the StreamingWriter. type StreamingWriterOpenOptions struct { - NamespaceID ident.ID - ShardID uint32 - BlockStart time.Time - BlockSize time.Duration - VolumeIndex int + NamespaceID ident.ID + ShardID uint32 + BlockStart time.Time + BlockSize time.Duration + VolumeIndex int + + // PlannedRecordsCount is an estimate of the number of series to be written. + // Must be greater than 0. PlannedRecordsCount uint } @@ -188,7 +191,8 @@ func (w *streamingWriter) writeIndexRelated( // time window w.bloomFilter.Add(id) - if entry.index%w.summaryEvery == 0 { + writeSummary := w.summaryEvery == 0 || entry.index%w.summaryEvery == 0 + if writeSummary { // Capture the offset for when we write this summary back, only capture // for every summary we'll actually write to avoid a few memcopies entry.indexFileOffset = w.indexOffset @@ -200,7 +204,7 @@ func (w *streamingWriter) writeIndexRelated( } w.indexOffset += length - if entry.index%w.summaryEvery == 0 { + if writeSummary { err = w.writer.writeSummariesEntry(id, entry) if err != nil { return err