Skip to content

Commit

Permalink
[dbnode] Extend lifetime of compactable index segments for aggregate …
Browse files Browse the repository at this point in the history
…queries (#2550)
  • Loading branch information
robskillington authored Aug 22, 2020
1 parent 9646a57 commit 11c1422
Show file tree
Hide file tree
Showing 38 changed files with 998 additions and 484 deletions.
57 changes: 54 additions & 3 deletions src/dbnode/integration/fs_bootstrap_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,66 @@ import (
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestFilesystemBootstrapIndexWithIndexingEnabled(t *testing.T) {
testFilesystemBootstrapIndexWithIndexingEnabled(t,
testFilesystemBootstrapIndexWithIndexingEnabledOptions{})
}

// TestFilesystemBootstrapIndexWithIndexingEnabledAndCheckTickFreeMmap makes
// sure that bootstrapped segments free mmap calls occur.
func TestFilesystemBootstrapIndexWithIndexingEnabledAndCheckTickFreeMmap(t *testing.T) {
testFilesystemBootstrapIndexWithIndexingEnabled(t,
testFilesystemBootstrapIndexWithIndexingEnabledOptions{
test: func(t *testing.T, setup TestSetup) {
var (
cancellable = context.NewCancellable()
numSegmentsBootstrapped int64
freeMmap int64
)
for _, ns := range setup.DB().Namespaces() {
idx, err := ns.Index()
require.NoError(t, err)

result, err := idx.Tick(cancellable, time.Now())
require.NoError(t, err)

numSegmentsBootstrapped += result.NumSegmentsBootstrapped
freeMmap += result.FreeMmap
}

log := setup.StorageOpts().InstrumentOptions().Logger()
log.Info("ticked namespaces",
zap.Int64("numSegmentsBootstrapped", numSegmentsBootstrapped),
zap.Int64("freeMmap", freeMmap))
require.True(t, numSegmentsBootstrapped > 0)
require.True(t, freeMmap > 0)
},
})
}

type testFilesystemBootstrapIndexWithIndexingEnabledOptions struct {
// test is an extended test to run at the end of the core bootstrap test.
test func(t *testing.T, setup TestSetup)
}

func testFilesystemBootstrapIndexWithIndexingEnabled(
t *testing.T,
testOpts testFilesystemBootstrapIndexWithIndexingEnabledOptions,
) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}

var (
blockSize = 2 * time.Hour
rOpts = retention.NewOptions().SetRetentionPeriod(2 * blockSize).SetBlockSize(blockSize)
rOpts = retention.NewOptions().SetRetentionPeriod(6 * blockSize).SetBlockSize(blockSize)
idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(2 * blockSize)
nOpts = namespace.NewOptions().SetRetentionOptions(rOpts).SetIndexOptions(idxOpts)
)
Expand Down Expand Up @@ -87,13 +134,13 @@ func TestFilesystemBootstrapIndexWithIndexingEnabled(t *testing.T) {
IDs: []string{fooSeries.ID.String()},
Tags: fooSeries.Tags,
NumPoints: 100,
Start: now.Add(-blockSize),
Start: now.Add(-3 * blockSize),
},
{
IDs: []string{barSeries.ID.String()},
Tags: barSeries.Tags,
NumPoints: 100,
Start: now.Add(-blockSize),
Start: now.Add(-3 * blockSize),
},
{
IDs: []string{fooSeries.ID.String()},
Expand Down Expand Up @@ -163,4 +210,8 @@ func TestFilesystemBootstrapIndexWithIndexingEnabled(t *testing.T) {
exhaustive: true,
expected: []generate.Series{barSeries, bazSeries},
})

if testOpts.test != nil {
testOpts.test(t, setup)
}
}
102 changes: 91 additions & 11 deletions src/dbnode/integration/index_single_node_high_concurrency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ package integration
import (
"fmt"
"math/rand"
"strconv"
"sync"
"testing"
"time"

"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/m3ninx/idx"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
xsync "github.com/m3db/m3/src/x/sync"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -88,15 +92,39 @@ func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalityQueryDuringWrites(t
}

testIndexSingleNodeHighConcurrency(t, testIndexHighConcurrencyOptions{
concurrencyEnqueueWorker: 8,
concurrencyWrites: 5000,
enqueuePerWorker: 100000,
numTags: 2,
concurrencyQueryDuringWrites: 16,
skipVerify: true,
concurrencyEnqueueWorker: 8,
concurrencyWrites: 5000,
enqueuePerWorker: 100000,
numTags: 2,
concurrencyQueryDuringWrites: 16,
concurrencyQueryDuringWritesType: indexQuery,
skipVerify: true,
})
}

func TestIndexSingleNodeHighConcurrencyFewTagsHighCardinalityAggregateQueryDuringWrites(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
}

testIndexSingleNodeHighConcurrency(t, testIndexHighConcurrencyOptions{
concurrencyEnqueueWorker: 8,
concurrencyWrites: 5000,
enqueuePerWorker: 100000,
numTags: 2,
concurrencyQueryDuringWrites: 1,
concurrencyQueryDuringWritesType: indexAggregateQuery,
skipVerify: true,
})
}

type queryType uint

const (
indexQuery queryType = iota
indexAggregateQuery
)

type testIndexHighConcurrencyOptions struct {
concurrencyEnqueueWorker int
concurrencyWrites int
Expand All @@ -111,6 +139,10 @@ type testIndexHighConcurrencyOptions struct {
// are performing writes.
concurrencyQueryDuringWrites int

// concurrencyQueryDuringWritesType determines the type of queries
// to issue performing writes.
concurrencyQueryDuringWritesType queryType

// skipVerify will skip verifying the actual series were indexed
// which is useful if just sanity checking can write/read concurrently
// without issue/errors and the stats look good.
Expand Down Expand Up @@ -216,12 +248,15 @@ func testIndexSingleNodeHighConcurrency(

// If concurrent query load enabled while writing also hit with queries.
queryConcDuringWritesCloseCh := make(chan struct{}, 1)
numTotalQueryMatches := atomic.NewUint32(0)
numTotalQueryErrors := atomic.NewUint32(0)
checkNumTotalQueryMatches := false
if opts.concurrencyQueryDuringWrites == 0 {
log.Info("no concurrent queries during writes configured")
} else {
log.Info("starting concurrent queries during writes",
zap.Int("concurrency", opts.concurrencyQueryDuringWrites))
checkNumTotalQueryMatches = true
for i := 0; i < opts.concurrencyQueryDuringWrites; i++ {
go func() {
src := rand.NewSource(int64(i))
Expand All @@ -231,16 +266,55 @@ func testIndexSingleNodeHighConcurrency(
case <-queryConcDuringWritesCloseCh:
return
default:
}

switch opts.concurrencyQueryDuringWritesType {
case indexQuery:
randI := rng.Intn(opts.concurrencyEnqueueWorker)
randJ := rng.Intn(opts.enqueuePerWorker)
id, tags := genIDTags(randI, randJ, opts.numTags)
_, err := isIndexedChecked(t, session, md.ID(), id, tags)
ok, err := isIndexedChecked(t, session, md.ID(), id, tags)
if err != nil {
if n := numTotalQueryErrors.Inc(); n < 10 {
// Log the first 10 errors for visibility but not flood.
log.Error("sampled query error", zap.Error(err))
}
}
if ok {
numTotalQueryMatches.Inc()
}
case indexAggregateQuery:
randI := rng.Intn(opts.concurrencyEnqueueWorker)
match := idx.NewTermQuery([]byte("common_i"), []byte(strconv.Itoa(randI)))
q := index.Query{Query: match}

now := time.Now()
qOpts := index.AggregationOptions{
QueryOptions: index.QueryOptions{
StartInclusive: now.Add(-md.Options().RetentionOptions().RetentionPeriod()),
EndExclusive: now,
DocsLimit: 1000,
},
}

ctx := context.NewContext()
r, err := testSetup.DB().AggregateQuery(ctx, md.ID(), q, qOpts)
if err != nil {
panic(err)
}

tagValues := 0
for _, entry := range r.Results.Map().Iter() {
values := entry.Value()
tagValues += values.Size()
}

// Done with resources, return to pool.
ctx.Close()

numTotalQueryMatches.Add(uint32(tagValues))
default:
panic("unknown query type")
}
}
}()
Expand All @@ -253,10 +327,16 @@ func testIndexSingleNodeHighConcurrency(
// Check no write errors.
require.Equal(t, int(0), int(numTotalErrors.Load()))

if checkNumTotalQueryMatches {
// Check matches.
require.True(t, numTotalQueryMatches.Load() > 0, "no query matches")
}

log.Info("test data written",
zap.Duration("took", time.Since(start)),
zap.Int("written", int(numTotalSuccess.Load())),
zap.Time("serverTime", nowFn()))
zap.Time("serverTime", nowFn()),
zap.Uint32("queryMatches", numTotalQueryMatches.Load()))

log.Info("data indexing verify start")

Expand Down Expand Up @@ -331,13 +411,13 @@ func testIndexSingleNodeHighConcurrency(
}()
}
fetchWg.Wait()
log.Info("data indexing verify done",
zap.Int("notIndexed", len(notIndexedErrs)),
zap.Duration("took", time.Since(start)))

require.Equal(t, 0, len(notIndexedErrs),
fmt.Sprintf("not indexed errors: %v", notIndexedErrs[:min(5, len(notIndexedErrs))]))
}

log.Info("data indexing verify done", zap.Duration("took", time.Since(start)))

// Make sure attempted total indexing = skipped + written.
counters = testSetup.Scope().Snapshot().Counters()
totalSkippedWritten := 0
Expand Down
36 changes: 20 additions & 16 deletions src/dbnode/storage/bootstrap/bootstrapper/fs/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/m3db/m3/src/dbnode/tracepoint"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/m3ninx/doc"
"github.com/m3db/m3/src/m3ninx/index/segment"
idxpersist "github.com/m3db/m3/src/m3ninx/persist"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/context"
Expand Down Expand Up @@ -485,19 +484,6 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(

remainingMin, remainingMax := remainingRanges.MinMax()
fulfilledMin, fulfilledMax := totalFulfilledRanges.MinMax()
buildIndexLogFields := []zapcore.Field{
zap.Stringer("namespace", ns.ID()),
zap.Bool("shouldBuildSegment", shouldBuildSegment),
zap.Bool("noneRemaining", noneRemaining),
zap.Bool("overlapsWithInitalIndexRange", overlapsWithInitalIndexRange),
zap.Int("totalEntries", totalEntries),
zap.String("requestedRangesMinMax", fmt.Sprintf("%v - %v", min, max)),
zap.String("remainingRangesMinMax", fmt.Sprintf("%v - %v", remainingMin, remainingMax)),
zap.String("remainingRanges", remainingRanges.SummaryString()),
zap.String("totalFulfilledRangesMinMax", fmt.Sprintf("%v - %v", fulfilledMin, fulfilledMax)),
zap.String("totalFulfilledRanges", totalFulfilledRanges.SummaryString()),
zap.String("initialIndexRange", fmt.Sprintf("%v - %v", initialIndexRange.Start, initialIndexRange.End)),
}

// NB(bodu): Assume if we're bootstrapping data from disk that it is the "default" index volume type.
existingIndexBlock, ok := bootstrapper.GetDefaultIndexBlockForBlockStart(runResult.index.IndexResults(), blockStart)
Expand All @@ -516,8 +502,26 @@ func (s *fileSystemSource) loadShardReadersDataIntoShardResult(
persistCfg := runOpts.PersistConfig()
shouldFlush := persistCfg.Enabled &&
persistCfg.FileSetType == persist.FileSetFlushType

// Determine all requested ranges were fulfilled or at edge of retention
satisifiedFlushRanges := noneRemaining || overlapsWithInitalIndexRange

buildIndexLogFields := []zapcore.Field{
zap.Stringer("namespace", ns.ID()),
zap.Bool("shouldBuildSegment", shouldBuildSegment),
zap.Bool("noneRemaining", noneRemaining),
zap.Bool("overlapsWithInitalIndexRange", overlapsWithInitalIndexRange),
zap.Int("totalEntries", totalEntries),
zap.String("requestedRangesMinMax", fmt.Sprintf("%v - %v", min, max)),
zap.String("remainingRangesMinMax", fmt.Sprintf("%v - %v", remainingMin, remainingMax)),
zap.String("remainingRanges", remainingRanges.SummaryString()),
zap.String("totalFulfilledRangesMinMax", fmt.Sprintf("%v - %v", fulfilledMin, fulfilledMax)),
zap.String("totalFulfilledRanges", totalFulfilledRanges.SummaryString()),
zap.String("initialIndexRange", fmt.Sprintf("%v - %v", initialIndexRange.Start, initialIndexRange.End)),
zap.Bool("shouldFlush", shouldFlush),
zap.Bool("satisifiedFlushRanges", satisifiedFlushRanges),
}

if shouldFlush && satisifiedFlushRanges {
s.log.Debug("building file set index segment", buildIndexLogFields...)
indexBlock, err = bootstrapper.PersistBootstrapIndexSegment(
Expand Down Expand Up @@ -891,9 +895,9 @@ func (s *fileSystemSource) bootstrapFromIndexPersistedBlocks(
}
segmentsFulfilled := willFulfill
// NB(bodu): All segments read from disk are already persisted.
persistedSegments := make([]segment.Segment, 0, len(segments))
persistedSegments := make([]result.Segment, 0, len(segments))
for _, segment := range segments {
persistedSegments = append(persistedSegments, bootstrapper.NewSegment(segment, true))
persistedSegments = append(persistedSegments, result.NewSegment(segment, true))
}
volumeType := idxpersist.DefaultIndexVolumeType
if info.IndexVolumeType != nil {
Expand Down
Loading

0 comments on commit 11c1422

Please sign in to comment.