Skip to content

Commit

Permalink
PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
nbroyles committed Nov 9, 2021
1 parent 85f5f7f commit 9fdd964
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 71 deletions.
11 changes: 6 additions & 5 deletions src/dbnode/storage/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"
)

Expand Down Expand Up @@ -156,19 +157,19 @@ func (entry *Entry) IndexedRange() (xtime.UnixNano, xtime.UnixNano) {
// is an error during retrieval, simply returns the current entry. Additionally,
// returns a cleanup function to run once finished using the reconciled entry and
// a boolean value indicating whether the result came from reconciliation or not.
func (entry *Entry) ReconciledOnIndexSeries() (doc.OnIndexSeries, doc.ReconciledOnIndexSeriesCleanupFn, bool) {
func (entry *Entry) ReconciledOnIndexSeries() (doc.OnIndexSeries, resource.SimpleCloser, bool) {
if entry.insertTime.Load() > 0 {
return entry, func() {}, false
return entry, resource.SimpleCloserFn(func() {}), false
}

e, _, err := entry.Shard.TryRetrieveSeriesAndIncrementReaderWriterCount(entry.ID)
if err != nil || e == nil {
return entry, func() {}, false
return entry, resource.SimpleCloserFn(func() {}), false
}

return e, func() {
return e, resource.SimpleCloserFn(func() {
e.DecrementReaderWriterCount()
}, true
}), true
}

// NeedsIndexUpdate returns a bool to indicate if the Entry needs to be indexed
Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/storage/entry_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,16 +234,16 @@ func TestReconciledOnIndexSeries(t *testing.T) {
_ = addMockSeries(ctrl, shard, series.ID(), ident.Tags{}, 1)

// Validate we perform the reconciliation.
e, cleanup, reconciled := entry.ReconciledOnIndexSeries()
e, closer, reconciled := entry.ReconciledOnIndexSeries()
require.True(t, reconciled)
require.Equal(t, uint64(1), e.(*Entry).Index)
cleanup()
closer.Close()

// Set the entry's insert time emulating being inserted into the shard.
// Ensure no reconciliation.
entry.SetInsertTime(time.Now())
e, cleanup, reconciled = entry.ReconciledOnIndexSeries()
e, closer, reconciled = entry.ReconciledOnIndexSeries()
require.False(t, reconciled)
require.Equal(t, uint64(0), e.(*Entry).Index)
cleanup()
closer.Close()
}
97 changes: 53 additions & 44 deletions src/dbnode/storage/index/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ type blockMetrics struct {
queryDocsMatched tally.Histogram
aggregateSeriesMatched tally.Histogram
aggregateDocsMatched tally.Histogram
reconciledEntryOnQuery tally.Counter
entryReconciledOnQuery tally.Counter
entryUnreconciledOnQuery tally.Counter
}

func newBlockMetrics(s tally.Scope) blockMetrics {
Expand All @@ -193,11 +194,12 @@ func newBlockMetrics(s tally.Scope) blockMetrics {
"skip_type": "not-immutable",
}).Counter(segmentFreeMmap),

querySeriesMatched: s.Histogram("query-series-matched", buckets),
queryDocsMatched: s.Histogram("query-docs-matched", buckets),
aggregateSeriesMatched: s.Histogram("aggregate-series-matched", buckets),
aggregateDocsMatched: s.Histogram("aggregate-docs-matched", buckets),
reconciledEntryOnQuery: s.Counter("reconciled-entry-on-query"),
querySeriesMatched: s.Histogram("query-series-matched", buckets),
queryDocsMatched: s.Histogram("query-docs-matched", buckets),
aggregateSeriesMatched: s.Histogram("aggregate-series-matched", buckets),
aggregateDocsMatched: s.Histogram("aggregate-docs-matched", buckets),
entryReconciledOnQuery: s.Counter("entry-reconciled-on-query"),
entryUnreconciledOnQuery: s.Counter("entry-unreconciled-on-query"),
}
}

Expand Down Expand Up @@ -544,44 +546,8 @@ func (b *block) queryWithSpan(

// Ensure that the block contains any of the relevant time segments for the query range.
doc := iter.Current()
if md, ok := doc.Metadata(); ok && md.OnIndexSeries != nil {
onIndexSeries, cleanup, reconciled := md.OnIndexSeries.ReconciledOnIndexSeries()
if reconciled {
b.metrics.reconciledEntryOnQuery.Inc(1)
}

var (
inBlock bool
currentBlock = opts.StartInclusive.Truncate(b.blockSize)
endExclusive = opts.EndExclusive
minIndexed, maxIndexed = onIndexSeries.IndexedRange()
)
if maxIndexed == 0 {
// Empty range.
cleanup()
continue
}

// Narrow down the range of blocks to scan because the client could have
// queried for an arbitrary wide range.
if currentBlock.Before(minIndexed) {
currentBlock = minIndexed
}
maxIndexedExclusive := maxIndexed.Add(time.Nanosecond)
if endExclusive.After(maxIndexedExclusive) {
endExclusive = maxIndexedExclusive
}

for !inBlock && currentBlock.Before(endExclusive) {
inBlock = onIndexSeries.IndexedForBlockStart(currentBlock)
currentBlock = currentBlock.Add(b.blockSize)
}

cleanup()

if !inBlock {
continue
}
if !b.docWithinQueryRange(doc, opts) {
continue
}

batch = append(batch, doc)
Expand Down Expand Up @@ -612,6 +578,49 @@ func (b *block) queryWithSpan(
return nil
}

func (b *block) docWithinQueryRange(doc doc.Document, opts QueryOptions) bool {
md, ok := doc.Metadata()
if !ok || md.OnIndexSeries == nil {
return true
}

onIndexSeries, closer, reconciled := md.OnIndexSeries.ReconciledOnIndexSeries()
if reconciled {
b.metrics.entryReconciledOnQuery.Inc(1)
} else {
b.metrics.entryUnreconciledOnQuery.Inc(1)
}
defer closer.Close()

var (
inBlock bool
currentBlock = opts.StartInclusive.Truncate(b.blockSize)
endExclusive = opts.EndExclusive
minIndexed, maxIndexed = onIndexSeries.IndexedRange()
)
if maxIndexed == 0 {
// Empty range.
return false
}

// Narrow down the range of blocks to scan because the client could have
// queried for an arbitrary wide range.
if currentBlock.Before(minIndexed) {
currentBlock = minIndexed
}
maxIndexedExclusive := maxIndexed.Add(time.Nanosecond)
if endExclusive.After(maxIndexedExclusive) {
endExclusive = maxIndexedExclusive
}

for !inBlock && currentBlock.Before(endExclusive) {
inBlock = onIndexSeries.IndexedForBlockStart(currentBlock)
currentBlock = currentBlock.Add(b.blockSize)
}

return inBlock
}

func (b *block) closeAsync(closer io.Closer) {
if err := closer.Close(); err != nil {
// Note: This only happens if closing the readers isn't clean.
Expand Down
5 changes: 3 additions & 2 deletions src/dbnode/storage/index/block_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"

"github.com/golang/mock/gomock"
Expand Down Expand Up @@ -135,6 +136,6 @@ func (m mockOnIndexSeries) IndexedRange() (xtime.UnixNano, xtime.UnixNano) {
return 0, 0
}

func (m mockOnIndexSeries) ReconciledOnIndexSeries() (doc.OnIndexSeries, doc.ReconciledOnIndexSeriesCleanupFn, bool) {
return m, func() {}, false
func (m mockOnIndexSeries) ReconciledOnIndexSeries() (doc.OnIndexSeries, resource.SimpleCloser, bool) {
return m, resource.SimpleCloserFn(func() {}), false
}
13 changes: 7 additions & 6 deletions src/dbnode/storage/index/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
"github.com/m3db/m3/src/x/resource"
"github.com/m3db/m3/src/x/tallytest"
xtime "github.com/m3db/m3/src/x/time"

Expand Down Expand Up @@ -1500,7 +1501,7 @@ func TestBlockE2EInsertQueryLimit(t *testing.T) {
h1 := doc.NewMockOnIndexSeries(ctrl)
h1.EXPECT().OnIndexFinalize(blockStart)
h1.EXPECT().OnIndexSuccess(blockStart)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false)
h1.EXPECT().IndexedRange().Return(blockStart, blockStart)
h1.EXPECT().IndexedForBlockStart(blockStart).Return(true)

Expand Down Expand Up @@ -1589,14 +1590,14 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) {
h1 := doc.NewMockOnIndexSeries(ctrl)
h1.EXPECT().OnIndexFinalize(blockStart)
h1.EXPECT().OnIndexSuccess(blockStart)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false)
h1.EXPECT().IndexedRange().Return(blockStart, blockStart)
h1.EXPECT().IndexedForBlockStart(blockStart).Return(true)

h2 := doc.NewMockOnIndexSeries(ctrl)
h2.EXPECT().OnIndexFinalize(blockStart)
h2.EXPECT().OnIndexSuccess(blockStart)
h2.EXPECT().ReconciledOnIndexSeries().Return(h2, func() {}, false)
h2.EXPECT().ReconciledOnIndexSeries().Return(h2, resource.SimpleCloserFn(func() {}), false)
h2.EXPECT().IndexedRange().Return(blockStart, blockStart)
h2.EXPECT().IndexedForBlockStart(blockStart).Return(true)

Expand Down Expand Up @@ -1691,7 +1692,7 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) {
h1 := doc.NewMockOnIndexSeries(ctrl)
h1.EXPECT().OnIndexFinalize(blockStart)
h1.EXPECT().OnIndexSuccess(blockStart)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false)
h1.EXPECT().IndexedRange().Return(blockStart, blockStart)
h1.EXPECT().IndexedForBlockStart(blockStart).Return(true)

Expand Down Expand Up @@ -1782,7 +1783,7 @@ func TestBlockE2EInsertAddResultsQueryNarrowingBlockRange(t *testing.T) {
h1 := doc.NewMockOnIndexSeries(ctrl)
h1.EXPECT().OnIndexFinalize(blockStart)
h1.EXPECT().OnIndexSuccess(blockStart)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false)
h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false)
h1.EXPECT().IndexedRange().Return(blockStart, blockStart.Add(2*blockSize))
h1.EXPECT().IndexedForBlockStart(blockStart).Return(false)
h1.EXPECT().IndexedForBlockStart(blockStart.Add(1 * blockSize)).Return(false)
Expand All @@ -1791,7 +1792,7 @@ func TestBlockE2EInsertAddResultsQueryNarrowingBlockRange(t *testing.T) {
h2 := doc.NewMockOnIndexSeries(ctrl)
h2.EXPECT().OnIndexFinalize(blockStart)
h2.EXPECT().OnIndexSuccess(blockStart)
h2.EXPECT().ReconciledOnIndexSeries().Return(h2, func() {}, false)
h2.EXPECT().ReconciledOnIndexSeries().Return(h2, resource.SimpleCloserFn(func() {}), false)
h2.EXPECT().IndexedRange().Return(xtime.UnixNano(0), xtime.UnixNano(0))

batch := NewWriteBatch(WriteBatchOptions{
Expand Down
3 changes: 2 additions & 1 deletion src/dbnode/storage/index_query_concurrent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/resource"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
)
Expand Down Expand Up @@ -187,7 +188,7 @@ func testNamespaceIndexHighConcurrentQueries(
AnyTimes()
onIndexSeries.EXPECT().
ReconciledOnIndexSeries().
Return(onIndexSeries, func() {}, false).
Return(onIndexSeries, resource.SimpleCloserFn(func() {}), false).
AnyTimes()

batch := index.NewWriteBatch(index.WriteBatchOptions{
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/storage/index_queue_forward_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/resource"
xsync "github.com/m3db/m3/src/x/sync"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -126,7 +127,9 @@ func setupForwardIndex(
)

if !expectAggregateQuery {
lifecycle.EXPECT().ReconciledOnIndexSeries().Return(lifecycle, func() {}, false).AnyTimes()
lifecycle.EXPECT().ReconciledOnIndexSeries().Return(
lifecycle, resource.SimpleCloserFn(func() {}), false,
).AnyTimes()

lifecycle.EXPECT().IndexedRange().Return(ts, ts)
lifecycle.EXPECT().IndexedForBlockStart(ts).Return(true)
Expand Down
5 changes: 4 additions & 1 deletion src/dbnode/storage/index_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/resource"
xsync "github.com/m3db/m3/src/x/sync"
xtest "github.com/m3db/m3/src/x/test"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -353,7 +354,9 @@ func setupIndex(t *testing.T,
lifecycleFns.EXPECT().IfAlreadyIndexedMarkIndexSuccessAndFinalize(gomock.Any()).Return(false)

if !expectAggregateQuery {
lifecycleFns.EXPECT().ReconciledOnIndexSeries().Return(lifecycleFns, func() {}, false)
lifecycleFns.EXPECT().ReconciledOnIndexSeries().Return(
lifecycleFns, resource.SimpleCloserFn(func() {}), false,
)
lifecycleFns.EXPECT().IndexedRange().Return(ts, ts)
lifecycleFns.EXPECT().IndexedForBlockStart(ts).Return(true)
}
Expand Down
5 changes: 3 additions & 2 deletions src/m3ninx/doc/doc_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions src/m3ninx/doc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package doc

import (
"github.com/m3db/m3/src/x/resource"
xtime "github.com/m3db/m3/src/x/time"
)

Expand Down Expand Up @@ -137,9 +138,5 @@ type OnIndexSeries interface {
// a boolean value indicating whether the result came from reconciliation or not.
// Cleanup function must be called once done with the reconciled entry so that
// reader and writer counts are accurately updated.
ReconciledOnIndexSeries() (OnIndexSeries, ReconciledOnIndexSeriesCleanupFn, bool)
ReconciledOnIndexSeries() (OnIndexSeries, resource.SimpleCloser, bool)
}

// ReconciledOnIndexSeriesCleanupFn is a function for performing cleanup when
// ReconciledOnIndexSeries is called.
type ReconciledOnIndexSeriesCleanupFn func()

0 comments on commit 9fdd964

Please sign in to comment.