Skip to content

Commit

Permalink
[dbnode] Refactor wide query path (#2826)
Browse files Browse the repository at this point in the history
  • Loading branch information
arnikola authored and robskillington committed Nov 12, 2020
1 parent 3746bf0 commit dd38ead
Show file tree
Hide file tree
Showing 48 changed files with 1,144 additions and 3,289 deletions.
14 changes: 11 additions & 3 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ linters-settings:
locale: US
ignore-words:
- someword
exhaustive:
default-signifies-exhaustive: true
lll:
# max line length, lines longer will be reported. Default is 120.
# '\t' is counted as 1 character by default, and can be changed with the tab-width option
Expand Down Expand Up @@ -172,7 +174,6 @@ linters:
- goconst
- gocritic
- gocyclo
- godox
- goimports
- golint
- gosimple
Expand Down Expand Up @@ -211,10 +212,17 @@ linters:
- exhaustivestruct
# We allow cuddling assignment following conditions because there are valid
# logical groupings for this use-case (e.g. when evaluating config values).
- wsl
- wsl
# Wrapcheck can cause errors until all callsites checking explicit error
# types like io.EOF are converted to use errors.Is instead. Re-enable this
# linter once all error checks are upgraded.
- wrapcheck
# godox prevents using TODOs or FIXMEs which can be useful for demarkation
# of future work.
- godox
# New line required before return would require a large fraction of the
# code base to need updating, it's not worth the perceived benefit.
- nlreturn
- nlreturn
disable-all: false
presets:
# bodyclose, errcheck, gosec, govet, scopelint, staticcheck, typecheck
Expand Down
7 changes: 4 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ tools-linux-amd64:
$(LINUX_AMD64_ENV) make tools

.PHONY: all
all: lint test-ci-unit test-ci-integration services tools
all: test-ci-unit test-ci-integration services tools
@echo Made all successfully

.PHONY: install-tools
Expand Down Expand Up @@ -256,7 +256,7 @@ SUBDIR_TARGETS := \
asset-gen \
genny-gen \
license-gen \
all-gen \
all-gen \
lint

.PHONY: test-ci-unit
Expand Down Expand Up @@ -384,6 +384,7 @@ endef

# generate targets across SUBDIRS for each SUBDIR_TARGET. i.e. generate rules
# which allow `make all-gen` to invoke `make all-gen-dbnode all-gen-coordinator ...`
# NB: we skip lint explicity as it runs as a separate CI step.
$(foreach SUBDIR_TARGET, $(SUBDIR_TARGETS), $(eval $(SUBDIR_TARGET_RULE)))

# Builds the single kube bundle from individual manifest files.
Expand All @@ -401,7 +402,7 @@ go-mod-tidy:
.PHONY: all-gen
all-gen: \
install-tools \
$(foreach SUBDIR_TARGET, $(SUBDIR_TARGETS), $(SUBDIR_TARGET)) \
$(foreach SUBDIR_TARGET, $(filter-out lint all-gen,$(SUBDIR_TARGETS)), $(SUBDIR_TARGET)) \
kube-gen-all \
go-mod-tidy

Expand Down
8 changes: 4 additions & 4 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ type Encoder interface {
DiscardReset(t time.Time, capacity int, schema namespace.SchemaDescr) ts.Segment
}

// NewEncoderFn creates a new encoder
// NewEncoderFn creates a new encoder.
type NewEncoderFn func(start time.Time, bytes []byte) Encoder

// Options represents different options for encoding time as well as markers.
Expand Down Expand Up @@ -178,7 +178,7 @@ type Iterator interface {
// object as it may get invalidated when the iterator calls Next().
Current() (ts.Datapoint, xtime.Unit, ts.Annotation)

// Err returns the error encountered
// Err returns the error encountered.
Err() error

// Close closes the iterator and if pooled will return to the pool.
Expand Down Expand Up @@ -367,9 +367,9 @@ type IStream interface {

// OStream encapsulates a writable stream.
type OStream interface {
// Len returns the length of the OStream
// Len returns the length of the OStream.
Len() int
// Empty returns whether the OStream is empty
// Empty returns whether the OStream is empty.
Empty() bool

// WriteBit writes the last bit of v.
Expand Down
3 changes: 1 addition & 2 deletions src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=wide $PACKAGE/src/dbnode/persist/fs/wide EntryChecksumMismatchChecker,StreamedMismatch | genclean -pkg $PACKAGE/src/dbnode/persist/fs/wide -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/wide/wide_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,CrossBlockReader,CrossBlockIterator,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs CrossBlockReader,CrossBlockIterator,DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
Expand Down
114 changes: 21 additions & 93 deletions src/dbnode/integration/wide_query_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// +build integration
// +build big
//
// Copyright (c) 2020 Uber Technologies, Inc.
//
Expand All @@ -23,7 +23,6 @@
package integration

import (
"bytes"
"fmt"
"io/ioutil"
"runtime"
Expand All @@ -32,10 +31,8 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/persist/fs/wide"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/sharding"
"github.com/m3db/m3/src/dbnode/storage"
Expand All @@ -60,9 +57,9 @@ const (
wideTagValFmt = "val-%05d"
)

type shardedIndexChecksum struct {
shard uint32
checksums []schema.IndexChecksum
type shardedWideEntry struct {
shard uint32
entries []schema.WideEntry
}

// buildExpectedChecksumsByShard sorts the given IDs into ascending shard order,
Expand All @@ -72,10 +69,10 @@ func buildExpectedChecksumsByShard(
allowedShards []uint32,
shardSet sharding.ShardSet,
batchSize int,
) []schema.IndexChecksum {
shardedChecksums := make([]shardedIndexChecksum, 0, len(ids))
) []schema.WideEntry {
shardedEntries := make([]shardedWideEntry, 0, len(ids))
for i, id := range ids {
checksum := schema.IndexChecksum{
entry := schema.WideEntry{
IndexEntry: schema.IndexEntry{
ID: []byte(id),
},
Expand All @@ -98,50 +95,36 @@ func buildExpectedChecksumsByShard(
}

found := false
for idx, sharded := range shardedChecksums {
for idx, sharded := range shardedEntries {
if shard != sharded.shard {
continue
}

found = true
shardedChecksums[idx].checksums = append(sharded.checksums, checksum)
shardedEntries[idx].entries = append(sharded.entries, entry)
break
}

if found {
continue
}

shardedChecksums = append(shardedChecksums, shardedIndexChecksum{
shard: shard,
checksums: []schema.IndexChecksum{checksum},
shardedEntries = append(shardedEntries, shardedWideEntry{
shard: shard,
entries: []schema.WideEntry{entry},
})
}

sort.Slice(shardedChecksums, func(i, j int) bool {
return shardedChecksums[i].shard < shardedChecksums[j].shard
sort.Slice(shardedEntries, func(i, j int) bool {
return shardedEntries[i].shard < shardedEntries[j].shard
})

var checksums []schema.IndexChecksum
for _, sharded := range shardedChecksums {
checksums = append(checksums, sharded.checksums...)
var entries []schema.WideEntry
for _, sharded := range shardedEntries {
entries = append(entries, sharded.entries...)
}

// NB: IDs should only be included for documents that conclude a batch.
l := len(checksums)
if l == 0 {
return checksums
}

// NB: only look at the last `l-1` elements, as the last element should
// always have its ID.
for i, checksum := range checksums[:l-1] {
if (i+1)%batchSize != 0 {
checksums[i].ID = checksum.ID[:0]
}
}

return checksums
return entries
}

func assertTags(
Expand All @@ -164,28 +147,6 @@ func assertTags(
require.NoError(t, decoder.Err())
}

func assertData(
t *testing.T,
ex int64,
exTime time.Time,
mismatch wide.ReadMismatch,
) {
mismatch.Data.IncRef()
mismatchData := mismatch.Data.Bytes()
mismatch.Data.DecRef()

decoder := m3tsz.NewDecoder(true, nil)
dataReader := bytes.NewBuffer(mismatchData)
it := decoder.Decode(dataReader)
assert.NoError(t, it.Err())
assert.True(t, it.Next())
ts, _, _ := it.Current()
assert.True(t, ts.Timestamp.Equal(exTime))
assert.Equal(t, float64(ex), ts.Value)
assert.False(t, it.Next())
assert.NoError(t, it.Err())
}

func TestWideFetch(t *testing.T) {
if testing.Short() {
t.SkipNow() // Just skip if we're doing a short run
Expand Down Expand Up @@ -315,10 +276,6 @@ func TestWideFetch(t *testing.T) {
decoder := tagDecoderPool.Get()
defer decoder.Close()

wideOpts := wide.NewOptions().
SetDecodingOptions(decOpts).
SetBatchSize(batchSize)

for _, tt := range shardFilterTests {
t.Run(tt.name, func(t *testing.T) {
ctx := context.NewContext()
Expand All @@ -333,32 +290,6 @@ func TestWideFetch(t *testing.T) {
assert.Equal(t, expected[i].MetadataChecksum, checksum.MetadataChecksum)
require.Equal(t, string(expected[i].ID), checksum.ID.String())
assertTags(t, checksum.EncodedTags, decoder, checksum.MetadataChecksum)
checksum.Finalize()
}

ctx.Close()
})

t.Run(fmt.Sprintf("%s_checksum_mismatch", tt.name), func(t *testing.T) {
ctx := context.NewContext()
// NB: empty index checksum blocks.
inCh := make(chan wide.IndexChecksumBlockBatch)
batchReader := wide.NewIndexChecksumBlockBatchReader(inCh)
close(inCh)

checker := wide.NewEntryChecksumMismatchChecker(batchReader, wideOpts)
mismatches, err := testSetup.DB().ReadMismatches(ctx, nsMetadata.ID(), query,
checker, now, tt.shards, iterOpts)
require.NoError(t, err)

expected := buildExpectedChecksumsByShard(ids, tt.shards,
testSetup.ShardSet(), batchSize)
require.Equal(t, len(expected), len(mismatches))
for i, mismatch := range mismatches {
assert.Equal(t, expected[i].MetadataChecksum, mismatch.MetadataChecksum)
assertTags(t, mismatch.EncodedTags, decoder, mismatch.MetadataChecksum)
assertData(t, expected[i].MetadataChecksum, now, mismatch)
mismatch.Finalize()
}

ctx.Close()
Expand Down Expand Up @@ -394,15 +325,14 @@ func TestWideFetch(t *testing.T) {
now, tt.shards, iterOpts)
require.NoError(t, err)

if !tt.expected {
assert.Equal(t, 0, len(chk))
} else {
if tt.expected {
require.Equal(t, 1, len(chk))
checksum := chk[0]
assert.Equal(t, int64(1), checksum.MetadataChecksum)
assert.Equal(t, exactID, checksum.ID.String())
assertTags(t, checksum.EncodedTags, decoder, checksum.MetadataChecksum)
checksum.Finalize()
} else {
assert.Equal(t, 0, len(chk))
}

ctx.Close()
Expand Down Expand Up @@ -443,8 +373,6 @@ func TestWideFetch(t *testing.T) {
expected[i].MetadataChecksum, checksum.MetadataChecksum)
break
}

checksum.Finalize()
}

ctx.Close()
Expand Down
Loading

0 comments on commit dd38ead

Please sign in to comment.