Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Use active index block which GCs expired series instead of explicit block rotations #3464

Merged
merged 74 commits into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
7d21f7d
[dbnode] Use active block which GCs series instead of explicit block …
robskillington May 5, 2021
fde5188
Fix supplying RelookupAndIncrementReaderWriterCount
robskillington May 5, 2021
2fe5644
Always write to active block
robskillington May 7, 2021
2b167e7
Set attempt to true when calling OnIndexPrepare
robskillington May 9, 2021
10cd227
Do not expire from the shard until no longer indexed any block starts
robskillington May 9, 2021
8270d5c
Check query range against block state
rallen090 Jun 11, 2021
951b420
Remove print
rallen090 Jun 11, 2021
5e2e22d
Allow no metadata
rallen090 Jun 11, 2021
d23c8fc
Fix test
rallen090 Jun 11, 2021
f1222d5
Only consider blocks both sealed and in-mem data evicted as flushed b…
robskillington Jul 1, 2021
f29148d
Rebased
rallen090 Jul 7, 2021
341c8ef
Fix unit test 1
rallen090 Jul 8, 2021
3bdfc8d
Fix unit test 2
rallen090 Jul 8, 2021
33194dc
Add debug info to integration test
rallen090 Jul 8, 2021
b9a0f32
Fix integration test
rallen090 Jul 8, 2021
612da98
Fix integration test for rotation
rallen090 Jul 8, 2021
7623cd4
More unit test fixes
rallen090 Jul 8, 2021
0869a68
Big unit test fixes
rallen090 Jul 8, 2021
12045e2
More unit test fixes
rallen090 Jul 9, 2021
2d37a0f
More unit test fixes 2
rallen090 Jul 9, 2021
3064b1c
More unit test fixes 3
rallen090 Jul 9, 2021
e7dc197
More unit test fixes
rallen090 Jul 13, 2021
b288ef0
Fix test TestNamespaceIndexTick
rallen090 Jul 13, 2021
7fd4eb9
Fix test TestLimits
rallen090 Jul 13, 2021
bbe332a
Fix test TestNamespaceForwardIndexInsertQuery
rallen090 Jul 13, 2021
b9930a1
Fix test TestNamespaceIndexForwardWrite
rallen090 Jul 13, 2021
5d9aee4
Fix test TestNamespaceIndexInsertQuery
rallen090 Jul 13, 2021
75768fe
Fix test TestNamespaceIndexInsertQuery
rallen090 Jul 13, 2021
aecdc56
Fix test TestShardAsyncInsertMarkIndexedForBlockStart
rallen090 Jul 13, 2021
9f4ff04
Fix test race condition
rallen090 Jul 13, 2021
9d0bf10
Fix test TestNamespaceIndexBlockQueryReleasingContext
rallen090 Jul 13, 2021
88229ac
Fix test TestNamespaceIndexHighConcurrentQueriesWithoutTimeouts
rallen090 Jul 14, 2021
97d534e
Fix test index_query_concurrent_test
rallen090 Jul 14, 2021
6eb4716
Fix test mock order in TestNamespaceIndexBlockQueryReleasingContext
rallen090 Jul 14, 2021
2d686d0
Fix test mock order in TestNamespaceIndexBlockQueryReleasingContext 2
rallen090 Jul 14, 2021
f1ec6ed
Gen
rallen090 Jul 14, 2021
bc8fb74
Doc update from master
rallen090 Jul 14, 2021
4d09a99
Lint 1
rallen090 Jul 14, 2021
f22f91e
Lint 2
rallen090 Jul 14, 2021
8e6241a
Lint 3
rallen090 Jul 14, 2021
8564260
Merge remote-tracking branch 'origin/master' into r/index-active-block
rallen090 Jul 14, 2021
f723d6f
Rebased
rallen090 Jul 14, 2021
6878e08
Rebased CI
rallen090 Jul 14, 2021
9294049
Gen
rallen090 Jul 14, 2021
ed89e86
Gen
rallen090 Jul 14, 2021
837131e
Add metric
rallen090 Jul 21, 2021
1a5362c
Merge remote-tracking branch 'origin/master' into r/index-active-block
rallen090 Jul 21, 2021
2df8748
Fix for index block appearing as a flushed index block, now explicit …
robskillington Jul 26, 2021
508327c
Fix considering block as flushed based on result from tick (check blo…
robskillington Jul 27, 2021
c815135
Merge remote-tracking branch 'origin/master' into r/index-active-block
rallen090 Aug 3, 2021
926a478
Check entry empty state to ensure GC eligible (#3634)
rallen090 Aug 19, 2021
957c743
Correctly decrement reader/writer on entry and misc cleanup
rallen090 Aug 20, 2021
9ebdead
Mock
rallen090 Aug 20, 2021
1adcf6d
Merge remote-tracking branch 'origin/master' into r/index-active-block
rallen090 Aug 20, 2021
8034ad6
Check series indexed along with isEmpty
rallen090 Aug 24, 2021
ca78fb5
Merge remote-tracking branch 'origin/master' into r/index-active-block
rallen090 Aug 24, 2021
06d62f1
Remove IndexedOrAttemptedAny
rallen090 Aug 25, 2021
12baa60
[dbnode] Check ref count when considering if a series is empty (#3694)
rallen090 Aug 25, 2021
7075595
Merge remote-tracking branch 'origin/master' into r/index-active-block
rallen090 Aug 25, 2021
30c753c
Fixing tests
rallen090 Aug 25, 2021
a6558fa
Add check for ID is nil
rallen090 Aug 25, 2021
2978a79
Add check for ID is nil 2
rallen090 Aug 25, 2021
c9d9c4f
Merge remote-tracking branch 'origin/master' into r/index-active-block
rallen090 Aug 25, 2021
bd4169d
Add IndexGarbageCollected
rallen090 Aug 26, 2021
4282709
Fix CI
rallen090 Aug 26, 2021
d83522c
More test fixes
rallen090 Aug 26, 2021
cd21e4a
Test fixes
rallen090 Aug 26, 2021
089e80c
Lint
rallen090 Aug 26, 2021
51501f8
Merge conflict
rallen090 Aug 26, 2021
b420578
Check if segments require background GC before compacting (#3695)
robskillington Aug 26, 2021
6b21b4e
Fix tests
rallen090 Aug 26, 2021
5131acd
Lint
rallen090 Aug 26, 2021
ad10d33
Fix active block test
rallen090 Aug 26, 2021
7cbbec9
Mock
rallen090 Aug 26, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion site/content/overview/components.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ M3DB is a distributed time series database that provides scalable storage and a

## M3 Aggregator

{{< fileinclude file="m3aggregator_intro.md" >}}
{{< fileinclude file="m3aggregator_intro.md" >}}
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
//go:generate sh -c "mockgen -package=lookup $PACKAGE/src/dbnode/storage/series/lookup IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage/series/lookup -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/lookup/lookup_mock.go"
//go:generate sh -c "mockgen -package=storage $PACKAGE/src/dbnode/storage IndexWriter | genclean -pkg $PACKAGE/src/dbnode/storage -out $GOPATH/src/$PACKAGE/src/dbnode/storage/lookup_mock.go"

// mockgen rules for generating mocks for unexported interfaces (file mode)
//go:generate sh -c "mockgen -package=encoding -destination=$GOPATH/src/$PACKAGE/src/dbnode/encoding/encoding_mock.go -source=$GOPATH/src/$PACKAGE/src/dbnode/encoding/types.go"
Expand Down
323 changes: 323 additions & 0 deletions src/dbnode/integration/index_active_block_rotate_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,323 @@
// +build integration
//
// Copyright (c) 2021 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package integration

import (
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"go.uber.org/zap/zaptest/observer"

"github.com/m3db/m3/src/dbnode/integration/generate"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/retention"
"github.com/m3db/m3/src/dbnode/storage/index/compaction"
xclock "github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)

func TestIndexActiveBlockRotate(t *testing.T) {
var (
testNsID = ident.StringID("testns")
numWrites = 50
numTags = 10
blockSize = 2 * time.Hour
indexBlockSize = blockSize * 2
retentionPeriod = 12 * blockSize
bufferPast = 10 * time.Minute
rOpts = retention.NewOptions().
SetRetentionPeriod(retentionPeriod).
SetBlockSize(blockSize).
SetBufferPast(bufferPast)

idxOpts = namespace.NewIndexOptions().SetEnabled(true).SetBlockSize(indexBlockSize)
nsOpts = namespace.NewOptions().
SetRetentionOptions(rOpts).
SetIndexOptions(idxOpts).
SetColdWritesEnabled(true)

defaultTimeout = time.Minute
// verifyTimeout = time.Minute
)
ns, err := namespace.NewMetadata(testNsID, nsOpts)
require.NoError(t, err)

// Set time to next warm flushable block transition
// (i.e. align by block + bufferPast - time.Second)
currTime := time.Now().UTC()
progressTime := false
progressTimeDelta := time.Duration(0)
lockTime := sync.RWMutex{}
setTime := func(t time.Time) {
lockTime.Lock()
defer lockTime.Unlock()
progressTime = false
currTime = t.UTC()
}
setProgressTime := func() {
lockTime.Lock()
defer lockTime.Unlock()
progressTime = true
actualNow := time.Now().UTC()
progressTimeDelta = currTime.Sub(actualNow)
}
nowFn := func() time.Time {
lockTime.RLock()
at := currTime
progress := progressTime
progressDelta := progressTimeDelta
lockTime.RUnlock()
if progress {
return time.Now().UTC().Add(progressDelta)
}
return at
}

testOpts := NewTestOptions(t).
SetNamespaces([]namespace.Metadata{ns}).
SetWriteNewSeriesAsync(true).
SetNowFn(nowFn)

testSetup, err := NewTestSetup(t, testOpts, nil)
require.NoError(t, err)
defer testSetup.Close()

// Write test data to disk so that there's some blocks on disk to simulate
// some index blocks already having on disk segments already in them.
require.NoError(t, testSetup.InitializeBootstrappers(InitializeBootstrappersOptions{
WithFileSystem: true,
}))
now := testSetup.NowFn()()
fooSeries := generate.Series{
ID: ident.StringID("foo"),
Tags: ident.NewTags(ident.StringTag("city", "new_york")),
}
barSeries := generate.Series{
ID: ident.StringID("bar"),
Tags: ident.NewTags(ident.StringTag("city", "new_jersey")),
}
seriesMaps := generate.BlocksByStart([]generate.BlockConfig{
{
IDs: []string{fooSeries.ID.String()},
Tags: fooSeries.Tags,
NumPoints: 100,
Start: now.Add(-3 * blockSize),
},
{
IDs: []string{barSeries.ID.String()},
Tags: barSeries.Tags,
NumPoints: 100,
Start: now.Add(-3 * blockSize),
},
})
require.NoError(t, writeTestDataToDisk(ns, testSetup, seriesMaps, 0))

// Set foreground compaction planner options to force index compaction.
minCompactSize := 10
foregroundCompactionOpts := compaction.DefaultOptions
foregroundCompactionOpts.Levels = []compaction.Level{
{
MinSizeInclusive: 0,
MaxSizeExclusive: int64(minCompactSize),
},
}
indexOpts := testSetup.StorageOpts().IndexOptions().
SetForegroundCompactionPlannerOptions(foregroundCompactionOpts)
testSetup.SetStorageOpts(testSetup.StorageOpts().SetIndexOptions(indexOpts))

// Configure log capture
log := testSetup.StorageOpts().InstrumentOptions().Logger()
captureCore, logs := observer.New(zapcore.ErrorLevel)
zapOpt := zap.WrapCore(func(existingCore zapcore.Core) zapcore.Core {
return zapcore.NewTee(existingCore, captureCore)
})
log = log.WithOptions(zapOpt)

// Wire up logger.
instrumentOpts := testSetup.StorageOpts().InstrumentOptions().
SetLogger(log)
testSetup.SetStorageOpts(testSetup.StorageOpts().SetInstrumentOptions(instrumentOpts))
scope := testSetup.Scope()

// Start the server.
require.NoError(t, testSetup.StartServer())

// Stop the server.
defer func() {
require.NoError(t, testSetup.StopServer())
log.Debug("server is now down")
}()

// Write test data.
session, err := testSetup.M3DBClient().DefaultSession()
require.NoError(t, err)

var (
metricGCSeries = "index.block.active-block.gc-series+namespace=" + testNsID.String()
metricFlushIndex = "database.flushIndex.success+namespace=" + testNsID.String()
)
prevWarmFlushes := counterValue(t, scope, metricFlushIndex)
prevNumGCSeries := 0
numGCSeries := counterValue(t, scope, metricGCSeries)
require.Equal(t, 0, numGCSeries)

prevLog := log
for i := 0; i < 4; i++ {
log = prevLog.With(zap.Int("checkIteration", i))

// Progress to next time just before a flush and freeze (using setTime).
prevTime := nowFn()
newTime := prevTime.
Truncate(indexBlockSize).
Add(2 * indexBlockSize)
setTime(newTime)
log.Info("progressing time to before next block edge",
zap.Stringer("prevTime", prevTime),
zap.Stringer("newTime", newTime))

start := time.Now()
log.Info("writing test data")

t0 := xtime.ToUnixNano(newTime.Add(-1 * (bufferPast / 2)))
t1 := xtime.ToUnixNano(newTime)
writesPeriodIter := GenerateTestIndexWrite(i, numWrites, numTags, t0, t1)
writesPeriodIter.Write(t, testNsID, session)
log.Info("test data written", zap.Duration("took", time.Since(start)))

log.Info("waiting till data is indexed")
indexed := xclock.WaitUntil(func() bool {
indexedPeriod := writesPeriodIter.NumIndexed(t, testNsID, session)
return indexedPeriod == len(writesPeriodIter)
}, 15*time.Second)
require.True(t, indexed,
fmt.Sprintf("unexpected data indexed: actual=%d, expected=%d",
writesPeriodIter.NumIndexedWithOptions(t, testNsID, session, NumIndexedOptions{Logger: log}),
len(writesPeriodIter)))
log.Info("verified data is indexed", zap.Duration("took", time.Since(start)))

newTime = prevTime.
Truncate(indexBlockSize).
Add(2 * indexBlockSize).
Add(bufferPast).
Add(-100 * time.Millisecond)
setTime(newTime)
log.Info("progressing time to before next flush",
zap.Stringer("prevTime", prevTime),
zap.Stringer("newTime", newTime))

log.Info("waiting till warm flush occurs")

// Resume time progressing by wall clock.
setProgressTime()

// Start checks to ensure metrics are visible the whole time.
checkFailed := atomic.NewUint64(0)
checkIndexable := func() {
numGCSeriesBefore := counterValue(t, scope, metricGCSeries)
indexedPeriod := writesPeriodIter.NumIndexed(t, testNsID, session)
numGCSeriesAfter := counterValue(t, scope, metricGCSeries)
if len(writesPeriodIter) != indexedPeriod {
assert.Equal(t, len(writesPeriodIter), indexedPeriod,
fmt.Sprintf("some metrics not indexed/visible: actual=%d, expected=%d, numGCBefore=%d, numGCAfter=%d",
writesPeriodIter.NumIndexedWithOptions(t, testNsID, session, NumIndexedOptions{Logger: log}),
len(writesPeriodIter),
numGCSeriesBefore,
numGCSeriesAfter))
checkFailed.Inc()
}
}

ticker := time.NewTicker(10 * time.Millisecond)
stopTickCh := make(chan struct{})
closedTickCh := make(chan struct{})
go func() {
defer func() {
ticker.Stop()
close(closedTickCh)
}()

for {
select {
case <-ticker.C:
checkIndexable()
case <-stopTickCh:
return
}
}
}()

start = time.Now()
warmFlushed := xclock.WaitUntil(func() bool {
return counterValue(t, scope, metricFlushIndex)-prevWarmFlushes > 0
}, defaultTimeout)
counter := counterValue(t, scope, metricFlushIndex)
require.True(t, warmFlushed,
fmt.Sprintf("warm flush stats: current=%d, previous=%d", counter, prevWarmFlushes))
log.Info("verified data has been warm flushed", zap.Duration("took", time.Since(start)))
prevWarmFlushes = counter

start = time.Now()
log.Info("waiting for GC of series")

expectedNumGCSeries := prevNumGCSeries + numWrites - minCompactSize
gcSeries := xclock.WaitUntil(func() bool {
numGCSeries := counterValue(t, scope, metricGCSeries)
return numGCSeries >= expectedNumGCSeries
}, defaultTimeout)
numGCSeries := counterValue(t, scope, metricGCSeries)
require.True(t, gcSeries,
fmt.Sprintf("unexpected num gc series: actual=%d, expected=%d",
numGCSeries, expectedNumGCSeries))
require.True(t, numGCSeries >= expectedNumGCSeries)
log.Info("verified series have been GC'd", zap.Duration("took", time.Since(start)))
prevNumGCSeries = numGCSeries

require.Equal(t, 0, logs.Len(), "errors found in logs during flush/indexing")

// Keep running indexable check for a few seconds, then progress next iter.
time.Sleep(5 * time.Second)
close(stopTickCh)
<-closedTickCh

// Ensure check did not fail.
require.True(t, checkFailed.Load() == 0,
fmt.Sprintf("check indexable errors: %d", checkFailed.Load()))
}

log.Info("checks passed")
}

func counterValue(t *testing.T, r tally.TestScope, key string) int {
v, ok := r.Snapshot().Counters()[key]
require.True(t, ok)
return int(v.Value())
}
14 changes: 11 additions & 3 deletions src/dbnode/integration/index_block_rotation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,20 @@ func TestIndexBlockRotation(t *testing.T) {
log.Info("querying period0 results after expiry")
// await for results to be empty.
// in practice we've seen it take 11s, so make it 30s to be safe.
time.Sleep(time.Second * 4)
timeout := time.Second * 30
empty := xclock.WaitUntil(func() bool {
noData := xclock.WaitUntil(func() bool {
period0Results, _, err = session.FetchTagged(ContextWithDefaultTimeout(),
md.ID(), query, index.QueryOptions{StartInclusive: t0, EndExclusive: t1})
require.NoError(t, err)
return period0Results.Len() == 0
require.True(t, period0Results.Len() == 50, "results still indexed")

for _, i := range period0Results.Iters() {
if i.Next() {
return false
}
}
return true
}, timeout)
require.True(t, empty, "results not empty after %s", timeout)
require.True(t, noData, "data still present after %s", timeout)
}
Loading