Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Refactor wide query path #2826

Merged
merged 25 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ linters-settings:
locale: US
ignore-words:
- someword
exhaustive:
default-signifies-exhaustive: true
lll:
# max line length, lines longer will be reported. Default is 120.
# '\t' is counted as 1 character by default, and can be changed with the tab-width option
Expand Down Expand Up @@ -172,7 +174,6 @@ linters:
- goconst
- gocritic
- gocyclo
- godox
- goimports
- golint
- gosimple
Expand Down Expand Up @@ -211,10 +212,17 @@ linters:
- exhaustivestruct
# We allow cuddling assignment following conditions because there are valid
# logical groupings for this use-case (e.g. when evaluating config values).
- wsl
- wsl
# Wrapcheck can cause errors until all callsites checking explicit error
# types like io.EOF are converted to use errors.Is instead. Re-enable this
# linter once all error checks are upgraded.
- wrapcheck
# godox prevents using TODOs or FIXMEs which can be useful for demarkation
# of future work.
- godox
# New line required before return would require a large fraction of the
# code base to need updating, it's not worth the perceived benefit.
- nlreturn
- nlreturn
disable-all: false
presets:
# bodyclose, errcheck, gosec, govet, scopelint, staticcheck, typecheck
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ tools-linux-amd64:
$(LINUX_AMD64_ENV) make tools

.PHONY: all
all: lint test-ci-unit test-ci-integration services tools
all: test-ci-unit test-ci-integration services tools
@echo Made all successfully

.PHONY: install-tools
Expand Down Expand Up @@ -256,7 +256,7 @@ SUBDIR_TARGETS := \
asset-gen \
genny-gen \
license-gen \
all-gen \
all-gen \
lint

.PHONY: test-ci-unit
Expand Down Expand Up @@ -384,6 +384,7 @@ endef

# generate targets across SUBDIRS for each SUBDIR_TARGET. i.e. generate rules
# which allow `make all-gen` to invoke `make all-gen-dbnode all-gen-coordinator ...`
# NB: we skip lint explicity as it runs as a separate CI step.
$(foreach SUBDIR_TARGET, $(SUBDIR_TARGETS), $(eval $(SUBDIR_TARGET_RULE)))

# Builds the single kube bundle from individual manifest files.
Expand All @@ -401,7 +402,7 @@ go-mod-tidy:
.PHONY: all-gen
all-gen: \
install-tools \
$(foreach SUBDIR_TARGET, $(SUBDIR_TARGETS), $(SUBDIR_TARGET)) \
$(foreach SUBDIR_TARGET, $(filter-out lint all-gen,$(SUBDIR_TARGETS)), $(SUBDIR_TARGET)) \
kube-gen-all \
go-mod-tidy

Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Encoder interface {
DiscardReset(t time.Time, capacity int, schema namespace.SchemaDescr) ts.Segment
}

// NewEncoderFn creates a new encoder
// NewEncoderFn creates a new encoder.
type NewEncoderFn func(start time.Time, bytes []byte) Encoder

// Options represents different options for encoding time as well as markers.
Expand Down Expand Up @@ -178,7 +178,7 @@ type Iterator interface {
// object as it may get invalidated when the iterator calls Next().
Current() (ts.Datapoint, xtime.Unit, ts.Annotation)

// Err returns the error encountered
// Err returns the error encountered.
Err() error

// Close closes the iterator and if pooled will return to the pool.
Expand Down Expand Up @@ -367,9 +367,9 @@ type IStream interface {

// OStream encapsulates a writable stream.
type OStream interface {
// Len returns the length of the OStream
// Len returns the length of the OStream.
Len() int
// Empty returns whether the OStream is empty
// Empty returns whether the OStream is empty.
Empty() bool

// WriteBit writes the last bit of v.
Expand Down
3 changes: 1 addition & 2 deletions src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=wide $PACKAGE/src/dbnode/persist/fs/wide EntryChecksumMismatchChecker,StreamedMismatch | genclean -pkg $PACKAGE/src/dbnode/persist/fs/wide -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/wide/wide_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader,CrossBlockIterator,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs CrossBlockReader,CrossBlockIterator,DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
Expand Down
114 changes: 21 additions & 93 deletions src/dbnode/integration/wide_query_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build integration
// +build big
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
Expand All @@ -23,7 +23,6 @@
package integration

import (
"bytes"
"fmt"
"io/ioutil"
"runtime"
Expand All @@ -32,10 +31,8 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/wide"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage"
Expand All @@ -60,9 +57,9 @@ const (
wideTagValFmt = "val-%05d"
)

type shardedIndexChecksum struct {
shard uint32
checksums []schema.IndexChecksum
type shardedWideEntry struct {
shard uint32
entries []schema.WideEntry
}

// buildExpectedChecksumsByShard sorts the given IDs into ascending shard order,
Expand All @@ -72,10 +69,10 @@ func buildExpectedChecksumsByShard(
allowedShards []uint32,
shardSet sharding.ShardSet,
batchSize int,
) []schema.IndexChecksum {
shardedChecksums := make([]shardedIndexChecksum, 0, len(ids))
) []schema.WideEntry {
shardedEntries := make([]shardedWideEntry, 0, len(ids))
for i, id := range ids {
checksum := schema.IndexChecksum{
entry := schema.WideEntry{
IndexEntry: schema.IndexEntry{
ID: []byte(id),
},
Expand All @@ -98,50 +95,36 @@ func buildExpectedChecksumsByShard(
}

found := false
for idx, sharded := range shardedChecksums {
for idx, sharded := range shardedEntries {
if shard != sharded.shard {
continue
}

found = true
shardedChecksums[idx].checksums = append(sharded.checksums, checksum)
shardedEntries[idx].entries = append(sharded.entries, entry)
break
}

if found {
continue
}

shardedChecksums = append(shardedChecksums, shardedIndexChecksum{
shard: shard,
checksums: []schema.IndexChecksum{checksum},
shardedEntries = append(shardedEntries, shardedWideEntry{
shard: shard,
entries: []schema.WideEntry{entry},
})
}

sort.Slice(shardedChecksums, func(i, j int) bool {
return shardedChecksums[i].shard < shardedChecksums[j].shard
sort.Slice(shardedEntries, func(i, j int) bool {
return shardedEntries[i].shard < shardedEntries[j].shard
})

var checksums []schema.IndexChecksum
for _, sharded := range shardedChecksums {
checksums = append(checksums, sharded.checksums...)
var entries []schema.WideEntry
for _, sharded := range shardedEntries {
entries = append(entries, sharded.entries...)
}

// NB: IDs should only be included for documents that conclude a batch.
l := len(checksums)
if l == 0 {
return checksums
}

// NB: only look at the last `l-1` elements, as the last element should
// always have its ID.
for i, checksum := range checksums[:l-1] {
if (i+1)%batchSize != 0 {
checksums[i].ID = checksum.ID[:0]
}
}

return checksums
return entries
}

func assertTags(
Expand All @@ -164,28 +147,6 @@ func assertTags(
require.NoError(t, decoder.Err())
}

func assertData(
t *testing.T,
ex int64,
exTime time.Time,
mismatch wide.ReadMismatch,
) {
mismatch.Data.IncRef()
mismatchData := mismatch.Data.Bytes()
mismatch.Data.DecRef()

decoder := m3tsz.NewDecoder(true, nil)
dataReader := bytes.NewBuffer(mismatchData)
it := decoder.Decode(dataReader)
assert.NoError(t, it.Err())
assert.True(t, it.Next())
ts, _, _ := it.Current()
assert.True(t, ts.Timestamp.Equal(exTime))
assert.Equal(t, float64(ex), ts.Value)
assert.False(t, it.Next())
assert.NoError(t, it.Err())
}

func TestWideFetch(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
Expand Down Expand Up @@ -315,10 +276,6 @@ func TestWideFetch(t *testing.T) {
decoder := tagDecoderPool.Get()
defer decoder.Close()

wideOpts := wide.NewOptions().
SetDecodingOptions(decOpts).
SetBatchSize(batchSize)

for _, tt := range shardFilterTests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.NewContext()
Expand All @@ -333,32 +290,6 @@ func TestWideFetch(t *testing.T) {
assert.Equal(t, expected[i].MetadataChecksum, checksum.MetadataChecksum)
require.Equal(t, string(expected[i].ID), checksum.ID.String())
assertTags(t, checksum.EncodedTags, decoder, checksum.MetadataChecksum)
checksum.Finalize()
}

ctx.Close()
})

t.Run(fmt.Sprintf("%s_checksum_mismatch", tt.name), func(t *testing.T) {
ctx := context.NewContext()
// NB: empty index checksum blocks.
inCh := make(chan wide.IndexChecksumBlockBatch)
batchReader := wide.NewIndexChecksumBlockBatchReader(inCh)
close(inCh)

checker := wide.NewEntryChecksumMismatchChecker(batchReader, wideOpts)
mismatches, err := testSetup.DB().ReadMismatches(ctx, nsMetadata.ID(), query,
checker, now, tt.shards, iterOpts)
require.NoError(t, err)

expected := buildExpectedChecksumsByShard(ids, tt.shards,
testSetup.ShardSet(), batchSize)
require.Equal(t, len(expected), len(mismatches))
for i, mismatch := range mismatches {
assert.Equal(t, expected[i].MetadataChecksum, mismatch.MetadataChecksum)
assertTags(t, mismatch.EncodedTags, decoder, mismatch.MetadataChecksum)
assertData(t, expected[i].MetadataChecksum, now, mismatch)
mismatch.Finalize()
}

ctx.Close()
Expand Down Expand Up @@ -394,15 +325,14 @@ func TestWideFetch(t *testing.T) {
now, tt.shards, iterOpts)
require.NoError(t, err)

if !tt.expected {
assert.Equal(t, 0, len(chk))
} else {
if tt.expected {
require.Equal(t, 1, len(chk))
checksum := chk[0]
assert.Equal(t, int64(1), checksum.MetadataChecksum)
assert.Equal(t, exactID, checksum.ID.String())
assertTags(t, checksum.EncodedTags, decoder, checksum.MetadataChecksum)
checksum.Finalize()
} else {
assert.Equal(t, 0, len(chk))
}

ctx.Close()
Expand Down Expand Up @@ -443,8 +373,6 @@ func TestWideFetch(t *testing.T) {
expected[i].MetadataChecksum, checksum.MetadataChecksum)
break
}

checksum.Finalize()
}

ctx.Close()
Expand Down
Loading