Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Implement shard.ScanData #2981

Merged
merged 12 commits into from
Dec 7, 2020
2 changes: 1 addition & 1 deletion src/dbnode/generated/mocks/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

// mockgen rules for generating mocks for exported interfaces (reflection mode)

//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=fs $PACKAGE/src/dbnode/persist/fs DataFileSetWriter,DataFileSetReader,DataFileSetSeeker,IndexFileSetWriter,IndexFileSetReader,IndexSegmentFileSetWriter,IndexSegmentFileSet,IndexSegmentFile,SnapshotMetadataFileWriter,DataFileSetSeekerManager,ConcurrentDataFileSetSeeker,MergeWith,StreamingWriter,DataEntryProcessor | genclean -pkg $PACKAGE/src/dbnode/persist/fs -out $GOPATH/src/$PACKAGE/src/dbnode/persist/fs/fs_mock.go"
//go:generate sh -c "mockgen -package=xio $PACKAGE/src/dbnode/x/xio SegmentReader,SegmentReaderPool | genclean -pkg $PACKAGE/src/dbnode/x/xio -out $GOPATH/src/$PACKAGE/src/dbnode/x/xio/io_mock.go"
//go:generate sh -c "mockgen -package=digest -destination=$GOPATH/src/$PACKAGE/src/dbnode/digest/digest_mock.go $PACKAGE/src/dbnode/digest ReaderWithDigest"
//go:generate sh -c "mockgen -package=series $PACKAGE/src/dbnode/storage/series DatabaseSeries,QueryableBlockRetriever | genclean -pkg $PACKAGE/src/dbnode/storage/series -out $GOPATH/src/$PACKAGE/src/dbnode/storage/series/series_mock.go"
Expand Down
62 changes: 54 additions & 8 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions src/dbnode/persist/fs/msgpack/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func (dec *Decoder) decodeWideEntry(
var checksum int64
if compare == 0 {
// NB: need to compute hash before freeing entry bytes.
checksum = dec.hasher.HashIndexEntry(entry)
checksum = dec.hasher.HashIndexEntry(entry.ID, entry.EncodedTags, entry.DataChecksum)
return schema.WideEntry{
IndexEntry: entry,
MetadataChecksum: checksum,
Expand All @@ -522,7 +522,7 @@ func (dec *Decoder) decodeWideEntry(
return emptyWideEntry, MismatchLookupStatus
}

// compareID must have been before the curret entry.ID, so this
// compareID must have been before the current entry.ID, so this
// ID will not be matched.
return emptyWideEntry, NotFoundLookupStatus
}
Expand Down
22 changes: 14 additions & 8 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs/msgpack"
"github.com/m3db/m3/src/dbnode/persist/schema"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/checked"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -373,22 +372,22 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
return nil
}

func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32, error) {
func (r *reader) StreamingRead() (StreamedDataEntry, error) {
if !r.streamingEnabled {
return nil, nil, nil, 0, errStreamingRequired
return StreamedDataEntry{}, errStreamingRequired
}

if r.entriesRead >= r.entries {
return nil, nil, nil, 0, io.EOF
return StreamedDataEntry{}, io.EOF
}

entry, err := r.decoder.DecodeIndexEntry(nil)
if err != nil {
return nil, nil, nil, 0, err
return StreamedDataEntry{}, err
}

if entry.Offset+entry.Size > int64(len(r.dataMmap.Bytes)) {
return nil, nil, nil, 0, fmt.Errorf(
return StreamedDataEntry{}, fmt.Errorf(
"attempt to read beyond data file size (offset=%d, size=%d, file size=%d)",
entry.Offset, entry.Size, len(r.dataMmap.Bytes))
}
Expand All @@ -397,7 +396,7 @@ func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32,
// NB(r): _must_ check the checksum against known checksum as the data
// file might not have been verified if we haven't read through the file yet.
if entry.DataChecksum != int64(digest.Checksum(data)) {
return nil, nil, nil, 0, errSeekChecksumMismatch
return StreamedDataEntry{}, errSeekChecksumMismatch
}

r.streamingData = append(r.streamingData[:0], data...)
Expand All @@ -406,7 +405,14 @@ func (r *reader) StreamingRead() (ident.BytesID, ts.EncodedTags, []byte, uint32,

r.entriesRead++

return r.streamingID, r.streamingTags, r.streamingData, uint32(entry.DataChecksum), nil
dataEntry := StreamedDataEntry{
ID: r.streamingID,
EncodedTags: r.streamingTags,
Data: r.streamingData,
DataChecksum: uint32(entry.DataChecksum),
}

return dataEntry, nil
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
Expand Down
11 changes: 7 additions & 4 deletions src/dbnode/persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,16 +573,19 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) {

func readData(t *testing.T, reader DataFileSetReader) (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) {
if reader.StreamingEnabled() {
id, encodedTags, data, checksum, err := reader.StreamingRead()
entry, err := reader.StreamingRead()
if err != nil {
return nil, nil, nil, 0, err
}
var tags = ident.EmptyTagIterator
if len(encodedTags) > 0 {
if len(entry.EncodedTags) > 0 {
tagsDecoder := testTagDecoderPool.Get()
tagsDecoder.Reset(checkedBytes(encodedTags))
tagsDecoder.Reset(checkedBytes(entry.EncodedTags))
require.NoError(t, tagsDecoder.Err())
tags = tagsDecoder
}

return id, tags, checked.NewBytes(data, nil), checksum, err
return entry.ID, tags, checked.NewBytes(entry.Data, nil), entry.DataChecksum, err
}

return reader.Read()
Expand Down
28 changes: 24 additions & 4 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,9 @@ type DataFileSetReader interface {

// StreamingRead returns the next unpooled id, encodedTags, data, checksum
// values ordered by id, or error, will return io.EOF at end of volume.
// Can only by used when DataReaderOpenOptions.StreamingEnabled is enabled.
// Note: the returned id, encodedTags and data get invalidated on the next call to StreamingRead.
StreamingRead() (id ident.BytesID, encodedTags ts.EncodedTags, data []byte, checksum uint32, err error)
// Can only by used when DataReaderOpenOptions.StreamingEnabled is true.
// Note: the returned data gets invalidated on the next call to StreamingRead.
StreamingRead() (StreamedDataEntry, error)

// ReadMetadata returns the next id and metadata or error, will return io.EOF at end of volume.
// Use either Read or ReadMetadata to progress through a volume, but not both.
Expand Down Expand Up @@ -216,7 +216,7 @@ type DataFileSetSeeker interface {

// SeekIndexEntry returns the IndexEntry for the specified ID. This can be useful
// ahead of issuing a number of seek requests so that the seek requests can be
// made in order. The returned IndexEntry can also be passed to SeekUsingIndexEntry
// made in order. The returned IndexEntry can also be passed to SeekByIndexEntry
// to prevent duplicate index lookups.
SeekIndexEntry(id ident.ID, resources ReusableSeekerResources) (IndexEntry, error)

Expand Down Expand Up @@ -677,3 +677,23 @@ type IndexClaimsManager interface {
blockStart time.Time,
) (int, error)
}

// StreamedDataEntry contains the data of single entry returned by streaming method(s).
// The underlying data slices are reused and invalidated on every read.
type StreamedDataEntry struct {
ID ident.BytesID
EncodedTags ts.EncodedTags
Data []byte
DataChecksum uint32
}

// NewReaderFn creates a new DataFileSetReader.
type NewReaderFn func(bytesPool pool.CheckedBytesPool, opts Options) (DataFileSetReader, error)

// DataEntryProcessor processes StreamedDataEntries.
type DataEntryProcessor interface {
// SetEntriesCount sets the number of entries to be processed.
SetEntriesCount(int)
// ProcessEntry processes a single StreamedDataEntry.
ProcessEntry(StreamedDataEntry) error
}
16 changes: 12 additions & 4 deletions src/dbnode/persist/schema/index_entry_hasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ package schema

import (
"github.com/cespare/xxhash/v2"

"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/ident"
)

type xxHasher struct{}
Expand All @@ -31,10 +34,15 @@ func NewXXHasher() IndexEntryHasher {
return xxHasher{}
}

func (h xxHasher) HashIndexEntry(e IndexEntry) int64 {
func (x xxHasher) HashIndexEntry(
id ident.BytesID,
encodedTags ts.EncodedTags,
dataChecksum int64,
) int64 {
hash := uint64(7)
hash = 31*hash + xxhash.Sum64(e.ID)
hash = 31*hash + xxhash.Sum64(e.EncodedTags)
hash = 31*hash + uint64(e.DataChecksum)
hash = 31*hash + xxhash.Sum64(id)
hash = 31*hash + xxhash.Sum64(encodedTags)
hash = 31*hash + uint64(dataChecksum)

return int64(hash)
}
3 changes: 2 additions & 1 deletion src/dbnode/persist/schema/index_entry_hasher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestIndexEntryHash(t *testing.T) {

hasher := NewXXHasher()
for _, tt := range tests {
assert.Equal(t, tt.expected, hasher.HashIndexEntry(tt.entry))
e := tt.entry
assert.Equal(t, tt.expected, hasher.HashIndexEntry(e.ID, e.EncodedTags, e.DataChecksum))
}
}
12 changes: 10 additions & 2 deletions src/dbnode/persist/schema/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package schema

import (
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/ident"
)

// MajorVersion is the major schema version for a set of fileset files,
Expand Down Expand Up @@ -90,9 +92,15 @@ type WideEntryFilter func(entry WideEntry) (bool, error)

// IndexEntryHasher hashes an index entry.
type IndexEntryHasher interface {
// HashIndexEntry computes a hash value for this IndexEntry using its ID, tags,
// HashIndexEntry computes a hash value for this index entry using its ID, tags,
// and the computed data checksum.
HashIndexEntry(e IndexEntry) int64
// NB: not passing the whole IndexEntry because of linter message:
// "hugeParam: e is heavy (88 bytes); consider passing it by pointer".
HashIndexEntry(
id ident.BytesID,
encodedTags ts.EncodedTags,
dataChecksum int64,
) int64
}

// IndexSummary stores a summary of an index entry to lookup.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"
xtime "github.com/m3db/m3/src/x/time"

"github.com/opentracing/opentracing-go"
Expand All @@ -58,7 +57,6 @@ const (
type newIteratorFn func(opts commitlog.IteratorOpts) (
iter commitlog.Iterator, corruptFiles []commitlog.ErrorWithPath, err error)
type snapshotFilesFn func(filePathPrefix string, namespace ident.ID, shard uint32) (fs.FileSetFilesSlice, error)
type newReaderFn func(bytesPool pool.CheckedBytesPool, opts fs.Options) (fs.DataFileSetReader, error)

type commitLogSource struct {
opts Options
Expand All @@ -70,7 +68,7 @@ type commitLogSource struct {

newIteratorFn newIteratorFn
snapshotFilesFn snapshotFilesFn
newReaderFn newReaderFn
newReaderFn fs.NewReaderFn

metrics commitLogSourceMetrics
// Cache the results of reading the commit log between passes. The commit log is not sharded by time range, so the
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/storage/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,7 +1386,7 @@ func (i *nsIndex) WideQuery(
opts,
)

// NB: result should be fina.ized here, regardless of outcome
// NB: result should be finalized here, regardless of outcome
// to prevent deadlocking while waiting on channel close.
defer results.Finalize()
queryOpts := opts.ToQueryOptions()
Expand Down
Loading