diff --git a/src/dbnode/storage/entry.go b/src/dbnode/storage/entry.go index 160f8b63d3..87a4ea8c7d 100644 --- a/src/dbnode/storage/entry.go +++ b/src/dbnode/storage/entry.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" ) @@ -156,19 +157,19 @@ func (entry *Entry) IndexedRange() (xtime.UnixNano, xtime.UnixNano) { // is an error during retrieval, simply returns the current entry. Additionally, // returns a cleanup function to run once finished using the reconciled entry and // a boolean value indicating whether the result came from reconciliation or not. -func (entry *Entry) ReconciledOnIndexSeries() (doc.OnIndexSeries, doc.ReconciledOnIndexSeriesCleanupFn, bool) { +func (entry *Entry) ReconciledOnIndexSeries() (doc.OnIndexSeries, resource.SimpleCloser, bool) { if entry.insertTime.Load() > 0 { - return entry, func() {}, false + return entry, resource.SimpleCloserFn(func() {}), false } e, _, err := entry.Shard.TryRetrieveSeriesAndIncrementReaderWriterCount(entry.ID) if err != nil || e == nil { - return entry, func() {}, false + return entry, resource.SimpleCloserFn(func() {}), false } - return e, func() { + return e, resource.SimpleCloserFn(func() { e.DecrementReaderWriterCount() - }, true + }), true } // NeedsIndexUpdate returns a bool to indicate if the Entry needs to be indexed diff --git a/src/dbnode/storage/entry_blackbox_test.go b/src/dbnode/storage/entry_blackbox_test.go index 2537cc6985..fbccf010ca 100644 --- a/src/dbnode/storage/entry_blackbox_test.go +++ b/src/dbnode/storage/entry_blackbox_test.go @@ -234,16 +234,16 @@ func TestReconciledOnIndexSeries(t *testing.T) { _ = addMockSeries(ctrl, shard, series.ID(), ident.Tags{}, 1) // Validate we perform the reconciliation. - e, cleanup, reconciled := entry.ReconciledOnIndexSeries() + e, closer, reconciled := entry.ReconciledOnIndexSeries() require.True(t, reconciled) require.Equal(t, uint64(1), e.(*Entry).Index) - cleanup() + closer.Close() // Set the entry's insert time emulating being inserted into the shard. // Ensure no reconciliation. entry.SetInsertTime(time.Now()) - e, cleanup, reconciled = entry.ReconciledOnIndexSeries() + e, closer, reconciled = entry.ReconciledOnIndexSeries() require.False(t, reconciled) require.Equal(t, uint64(0), e.(*Entry).Index) - cleanup() + closer.Close() } diff --git a/src/dbnode/storage/index/block.go b/src/dbnode/storage/index/block.go index d8ae7ff8ee..292151f759 100644 --- a/src/dbnode/storage/index/block.go +++ b/src/dbnode/storage/index/block.go @@ -170,7 +170,8 @@ type blockMetrics struct { queryDocsMatched tally.Histogram aggregateSeriesMatched tally.Histogram aggregateDocsMatched tally.Histogram - reconciledEntryOnQuery tally.Counter + entryReconciledOnQuery tally.Counter + entryUnreconciledOnQuery tally.Counter } func newBlockMetrics(s tally.Scope) blockMetrics { @@ -193,11 +194,12 @@ func newBlockMetrics(s tally.Scope) blockMetrics { "skip_type": "not-immutable", }).Counter(segmentFreeMmap), - querySeriesMatched: s.Histogram("query-series-matched", buckets), - queryDocsMatched: s.Histogram("query-docs-matched", buckets), - aggregateSeriesMatched: s.Histogram("aggregate-series-matched", buckets), - aggregateDocsMatched: s.Histogram("aggregate-docs-matched", buckets), - reconciledEntryOnQuery: s.Counter("reconciled-entry-on-query"), + querySeriesMatched: s.Histogram("query-series-matched", buckets), + queryDocsMatched: s.Histogram("query-docs-matched", buckets), + aggregateSeriesMatched: s.Histogram("aggregate-series-matched", buckets), + aggregateDocsMatched: s.Histogram("aggregate-docs-matched", buckets), + entryReconciledOnQuery: s.Counter("entry-reconciled-on-query"), + entryUnreconciledOnQuery: s.Counter("entry-unreconciled-on-query"), } } @@ -544,44 +546,8 @@ func (b *block) queryWithSpan( // Ensure that the block contains any of the relevant time segments for the query range. doc := iter.Current() - if md, ok := doc.Metadata(); ok && md.OnIndexSeries != nil { - onIndexSeries, cleanup, reconciled := md.OnIndexSeries.ReconciledOnIndexSeries() - if reconciled { - b.metrics.reconciledEntryOnQuery.Inc(1) - } - - var ( - inBlock bool - currentBlock = opts.StartInclusive.Truncate(b.blockSize) - endExclusive = opts.EndExclusive - minIndexed, maxIndexed = onIndexSeries.IndexedRange() - ) - if maxIndexed == 0 { - // Empty range. - cleanup() - continue - } - - // Narrow down the range of blocks to scan because the client could have - // queried for an arbitrary wide range. - if currentBlock.Before(minIndexed) { - currentBlock = minIndexed - } - maxIndexedExclusive := maxIndexed.Add(time.Nanosecond) - if endExclusive.After(maxIndexedExclusive) { - endExclusive = maxIndexedExclusive - } - - for !inBlock && currentBlock.Before(endExclusive) { - inBlock = onIndexSeries.IndexedForBlockStart(currentBlock) - currentBlock = currentBlock.Add(b.blockSize) - } - - cleanup() - - if !inBlock { - continue - } + if !b.docWithinQueryRange(doc, opts) { + continue } batch = append(batch, doc) @@ -612,6 +578,49 @@ func (b *block) queryWithSpan( return nil } +func (b *block) docWithinQueryRange(doc doc.Document, opts QueryOptions) bool { + md, ok := doc.Metadata() + if !ok || md.OnIndexSeries == nil { + return true + } + + onIndexSeries, closer, reconciled := md.OnIndexSeries.ReconciledOnIndexSeries() + if reconciled { + b.metrics.entryReconciledOnQuery.Inc(1) + } else { + b.metrics.entryUnreconciledOnQuery.Inc(1) + } + defer closer.Close() + + var ( + inBlock bool + currentBlock = opts.StartInclusive.Truncate(b.blockSize) + endExclusive = opts.EndExclusive + minIndexed, maxIndexed = onIndexSeries.IndexedRange() + ) + if maxIndexed == 0 { + // Empty range. + return false + } + + // Narrow down the range of blocks to scan because the client could have + // queried for an arbitrary wide range. + if currentBlock.Before(minIndexed) { + currentBlock = minIndexed + } + maxIndexedExclusive := maxIndexed.Add(time.Nanosecond) + if endExclusive.After(maxIndexedExclusive) { + endExclusive = maxIndexedExclusive + } + + for !inBlock && currentBlock.Before(endExclusive) { + inBlock = onIndexSeries.IndexedForBlockStart(currentBlock) + currentBlock = currentBlock.Add(b.blockSize) + } + + return inBlock +} + func (b *block) closeAsync(closer io.Closer) { if err := closer.Close(); err != nil { // Note: This only happens if closing the readers isn't clean. diff --git a/src/dbnode/storage/index/block_bench_test.go b/src/dbnode/storage/index/block_bench_test.go index b6d93aa325..a893653be7 100644 --- a/src/dbnode/storage/index/block_bench_test.go +++ b/src/dbnode/storage/index/block_bench_test.go @@ -30,6 +30,7 @@ import ( "github.com/m3db/m3/src/dbnode/namespace" "github.com/m3db/m3/src/m3ninx/doc" + "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -135,6 +136,6 @@ func (m mockOnIndexSeries) IndexedRange() (xtime.UnixNano, xtime.UnixNano) { return 0, 0 } -func (m mockOnIndexSeries) ReconciledOnIndexSeries() (doc.OnIndexSeries, doc.ReconciledOnIndexSeriesCleanupFn, bool) { - return m, func() {}, false +func (m mockOnIndexSeries) ReconciledOnIndexSeries() (doc.OnIndexSeries, resource.SimpleCloser, bool) { + return m, resource.SimpleCloserFn(func() {}), false } diff --git a/src/dbnode/storage/index/block_test.go b/src/dbnode/storage/index/block_test.go index 0c44c4cfff..0dceb11b24 100644 --- a/src/dbnode/storage/index/block_test.go +++ b/src/dbnode/storage/index/block_test.go @@ -45,6 +45,7 @@ import ( "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" + "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/tallytest" xtime "github.com/m3db/m3/src/x/time" @@ -1500,7 +1501,7 @@ func TestBlockE2EInsertQueryLimit(t *testing.T) { h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false) + h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false) h1.EXPECT().IndexedRange().Return(blockStart, blockStart) h1.EXPECT().IndexedForBlockStart(blockStart).Return(true) @@ -1589,14 +1590,14 @@ func TestBlockE2EInsertAddResultsQuery(t *testing.T) { h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false) + h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false) h1.EXPECT().IndexedRange().Return(blockStart, blockStart) h1.EXPECT().IndexedForBlockStart(blockStart).Return(true) h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) - h2.EXPECT().ReconciledOnIndexSeries().Return(h2, func() {}, false) + h2.EXPECT().ReconciledOnIndexSeries().Return(h2, resource.SimpleCloserFn(func() {}), false) h2.EXPECT().IndexedRange().Return(blockStart, blockStart) h2.EXPECT().IndexedForBlockStart(blockStart).Return(true) @@ -1691,7 +1692,7 @@ func TestBlockE2EInsertAddResultsMergeQuery(t *testing.T) { h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false) + h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false) h1.EXPECT().IndexedRange().Return(blockStart, blockStart) h1.EXPECT().IndexedForBlockStart(blockStart).Return(true) @@ -1782,7 +1783,7 @@ func TestBlockE2EInsertAddResultsQueryNarrowingBlockRange(t *testing.T) { h1 := doc.NewMockOnIndexSeries(ctrl) h1.EXPECT().OnIndexFinalize(blockStart) h1.EXPECT().OnIndexSuccess(blockStart) - h1.EXPECT().ReconciledOnIndexSeries().Return(h1, func() {}, false) + h1.EXPECT().ReconciledOnIndexSeries().Return(h1, resource.SimpleCloserFn(func() {}), false) h1.EXPECT().IndexedRange().Return(blockStart, blockStart.Add(2*blockSize)) h1.EXPECT().IndexedForBlockStart(blockStart).Return(false) h1.EXPECT().IndexedForBlockStart(blockStart.Add(1 * blockSize)).Return(false) @@ -1791,7 +1792,7 @@ func TestBlockE2EInsertAddResultsQueryNarrowingBlockRange(t *testing.T) { h2 := doc.NewMockOnIndexSeries(ctrl) h2.EXPECT().OnIndexFinalize(blockStart) h2.EXPECT().OnIndexSuccess(blockStart) - h2.EXPECT().ReconciledOnIndexSeries().Return(h2, func() {}, false) + h2.EXPECT().ReconciledOnIndexSeries().Return(h2, resource.SimpleCloserFn(func() {}), false) h2.EXPECT().IndexedRange().Return(xtime.UnixNano(0), xtime.UnixNano(0)) batch := NewWriteBatch(WriteBatchOptions{ diff --git a/src/dbnode/storage/index_query_concurrent_test.go b/src/dbnode/storage/index_query_concurrent_test.go index 9b8b6ea29a..3ff9b2395c 100644 --- a/src/dbnode/storage/index_query_concurrent_test.go +++ b/src/dbnode/storage/index_query_concurrent_test.go @@ -45,6 +45,7 @@ import ( "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" "github.com/m3db/m3/src/x/instrument" + "github.com/m3db/m3/src/x/resource" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" ) @@ -187,7 +188,7 @@ func testNamespaceIndexHighConcurrentQueries( AnyTimes() onIndexSeries.EXPECT(). ReconciledOnIndexSeries(). - Return(onIndexSeries, func() {}, false). + Return(onIndexSeries, resource.SimpleCloserFn(func() {}), false). AnyTimes() batch := index.NewWriteBatch(index.WriteBatchOptions{ diff --git a/src/dbnode/storage/index_queue_forward_write_test.go b/src/dbnode/storage/index_queue_forward_write_test.go index 92f8f1bd4e..e8dcf8ec02 100644 --- a/src/dbnode/storage/index_queue_forward_write_test.go +++ b/src/dbnode/storage/index_queue_forward_write_test.go @@ -39,6 +39,7 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" @@ -126,7 +127,9 @@ func setupForwardIndex( ) if !expectAggregateQuery { - lifecycle.EXPECT().ReconciledOnIndexSeries().Return(lifecycle, func() {}, false).AnyTimes() + lifecycle.EXPECT().ReconciledOnIndexSeries().Return( + lifecycle, resource.SimpleCloserFn(func() {}), false, + ).AnyTimes() lifecycle.EXPECT().IndexedRange().Return(ts, ts) lifecycle.EXPECT().IndexedForBlockStart(ts).Return(true) diff --git a/src/dbnode/storage/index_queue_test.go b/src/dbnode/storage/index_queue_test.go index 2db743cd20..5783675492 100644 --- a/src/dbnode/storage/index_queue_test.go +++ b/src/dbnode/storage/index_queue_test.go @@ -36,6 +36,7 @@ import ( "github.com/m3db/m3/src/x/clock" "github.com/m3db/m3/src/x/context" "github.com/m3db/m3/src/x/ident" + "github.com/m3db/m3/src/x/resource" xsync "github.com/m3db/m3/src/x/sync" xtest "github.com/m3db/m3/src/x/test" xtime "github.com/m3db/m3/src/x/time" @@ -353,7 +354,9 @@ func setupIndex(t *testing.T, lifecycleFns.EXPECT().IfAlreadyIndexedMarkIndexSuccessAndFinalize(gomock.Any()).Return(false) if !expectAggregateQuery { - lifecycleFns.EXPECT().ReconciledOnIndexSeries().Return(lifecycleFns, func() {}, false) + lifecycleFns.EXPECT().ReconciledOnIndexSeries().Return( + lifecycleFns, resource.SimpleCloserFn(func() {}), false, + ) lifecycleFns.EXPECT().IndexedRange().Return(ts, ts) lifecycleFns.EXPECT().IndexedForBlockStart(ts).Return(true) } diff --git a/src/m3ninx/doc/doc_mock.go b/src/m3ninx/doc/doc_mock.go index 1a8fc04909..2b7b0e2950 100644 --- a/src/m3ninx/doc/doc_mock.go +++ b/src/m3ninx/doc/doc_mock.go @@ -27,6 +27,7 @@ package doc import ( "reflect" + "github.com/m3db/m3/src/x/resource" "github.com/m3db/m3/src/x/time" "github.com/golang/mock/gomock" @@ -414,11 +415,11 @@ func (mr *MockOnIndexSeriesMockRecorder) OnIndexSuccess(blockStart interface{}) } // ReconciledOnIndexSeries mocks base method. -func (m *MockOnIndexSeries) ReconciledOnIndexSeries() (OnIndexSeries, ReconciledOnIndexSeriesCleanupFn, bool) { +func (m *MockOnIndexSeries) ReconciledOnIndexSeries() (OnIndexSeries, resource.SimpleCloser, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ReconciledOnIndexSeries") ret0, _ := ret[0].(OnIndexSeries) - ret1, _ := ret[1].(ReconciledOnIndexSeriesCleanupFn) + ret1, _ := ret[1].(resource.SimpleCloser) ret2, _ := ret[2].(bool) return ret0, ret1, ret2 } diff --git a/src/m3ninx/doc/types.go b/src/m3ninx/doc/types.go index cae4365f7a..90db2b2003 100644 --- a/src/m3ninx/doc/types.go +++ b/src/m3ninx/doc/types.go @@ -21,6 +21,7 @@ package doc import ( + "github.com/m3db/m3/src/x/resource" xtime "github.com/m3db/m3/src/x/time" ) @@ -137,9 +138,5 @@ type OnIndexSeries interface { // a boolean value indicating whether the result came from reconciliation or not. // Cleanup function must be called once done with the reconciled entry so that // reader and writer counts are accurately updated. - ReconciledOnIndexSeries() (OnIndexSeries, ReconciledOnIndexSeriesCleanupFn, bool) + ReconciledOnIndexSeries() (OnIndexSeries, resource.SimpleCloser, bool) } - -// ReconciledOnIndexSeriesCleanupFn is a function for performing cleanup when -// ReconciledOnIndexSeries is called. -type ReconciledOnIndexSeriesCleanupFn func()