Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Use StreamingReadMetadata for bootstrapping #2938

Merged
merged 25 commits into from
Jan 25, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
7926f83
[dbnode] Use StreamingReadMetadata for bootstrapping
linasm Nov 22, 2020
345d2e9
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Nov 22, 2020
ae4cc29
Formatting
linasm Nov 22, 2020
9436c18
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Nov 24, 2020
41df0f4
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Nov 25, 2020
771481a
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Nov 29, 2020
115fee9
lint
linasm Nov 29, 2020
2cb1132
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Nov 30, 2020
1ffa8dd
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Dec 7, 2020
10b6229
Lint
linasm Dec 7, 2020
3a1de0a
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Dec 8, 2020
7e2f9bf
Address PR feedback
linasm Dec 9, 2020
b460ac7
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Dec 9, 2020
c09a057
nit
linasm Dec 10, 2020
a7b2aad
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Dec 10, 2020
254e41d
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
arnikola Dec 10, 2020
b26f7d1
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Dec 13, 2020
319ae71
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Dec 26, 2020
94fa358
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
vpranckaitis Jan 20, 2021
ba1c575
post-merge cleanup and fixes
vpranckaitis Jan 20, 2021
3fe0419
remove left-over code
vpranckaitis Jan 21, 2021
2de7902
accept nil encoded tags
vpranckaitis Jan 21, 2021
afa3a96
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
vpranckaitis Jan 22, 2021
f2846c5
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
vpranckaitis Jan 25, 2021
7fb067d
Merge branch 'master' into linasm/bootstrap-streaming-read-metadata
linasm Jan 25, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ linters:
- godox
# New line required before return would require a large fraction of the
# code base to need updating, it's not worth the perceived benefit.
- nlreturn
- nlreturn
# Opinionated and sometimes wrong.
- paralleltest
disable-all: false
Expand Down
15 changes: 15 additions & 0 deletions src/dbnode/persist/fs/fs_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 33 additions & 22 deletions src/dbnode/persist/fs/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@ var (

errUnexpectedSortByOffset = errors.New("should not sort index by offsets when doing reads sorted by id")

// errReadMetadataOptimizedForRead returned when we optimized for only reading metadata but are attempting a regular read
errReadMetadataOptimizedForRead = errors.New("read metadata optimized for regular read")

errStreamingRequired = errors.New("streaming must be enabled for streaming read methods")
errStreamingUnsupported = errors.New("streaming mode be disabled for non streaming read methods")
)
Expand Down Expand Up @@ -112,10 +109,6 @@ type reader struct {
volume int
open bool
streamingEnabled bool
// NB(bodu): Informs whether or not we optimize for only reading
// metadata. We don't need to sort for reading metadata but sorting is
// required if we are performing regulars reads.
optimizedReadMetadataOnly bool
}

// NewReader returns a new reader and expects all files to exist. Will read the
Expand Down Expand Up @@ -292,7 +285,6 @@ func (r *reader) Open(opts DataReaderOpenOptions) error {
r.open = true
r.namespace = namespace
r.shard = shard
r.optimizedReadMetadataOnly = opts.OptimizedReadMetadataOnly

return nil
}
Expand Down Expand Up @@ -363,12 +355,10 @@ func (r *reader) readIndexAndSortByOffsetAsc() error {
}
r.indexEntriesByOffsetAsc = append(r.indexEntriesByOffsetAsc, entry)
}
// This is false by default so we always sort unless otherwise specified.
if !r.optimizedReadMetadataOnly {
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data. This is only required for regular reads.
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))
}
// NB(r): As we decode each block we need access to each index entry
// in the order we decode the data. This is only required for regular reads.
sort.Sort(indexEntriesByOffsetAsc(r.indexEntriesByOffsetAsc))

return nil
}

Expand Down Expand Up @@ -405,25 +395,19 @@ func (r *reader) StreamingRead() (StreamedDataEntry, error) {

r.entriesRead++

dataEntry := StreamedDataEntry{
return StreamedDataEntry{
ID: r.streamingID,
EncodedTags: r.streamingTags,
Data: r.streamingData,
DataChecksum: uint32(entry.DataChecksum),
}

return dataEntry, nil
}, nil
}

func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, error) {
if r.streamingEnabled {
return nil, nil, nil, 0, errStreamingUnsupported
}

// NB(bodu): We cannot perform regular reads if we're optimizing for only reading metadata.
if r.optimizedReadMetadataOnly {
return nil, nil, nil, 0, errReadMetadataOptimizedForRead
}
if r.entries > 0 && len(r.indexEntriesByOffsetAsc) < r.entries {
// Have not read the index yet, this is required when reading
// data as we need each index entry in order by by the offset ascending
Expand Down Expand Up @@ -465,6 +449,33 @@ func (r *reader) Read() (ident.ID, ident.TagIterator, checked.Bytes, uint32, err
return id, tags, data, uint32(entry.DataChecksum), nil
}

func (r *reader) StreamingReadMetadata() (StreamedMetadataEntry, error) {
if !r.streamingEnabled {
return StreamedMetadataEntry{}, errStreamingRequired
}

if r.metadataRead >= r.entries {
return StreamedMetadataEntry{}, io.EOF
}

entry, err := r.decoder.DecodeIndexEntry(nil)
if err != nil {
return StreamedMetadataEntry{}, err
}

r.streamingID = append(r.streamingID[:0], entry.ID...)
r.streamingTags = append(r.streamingTags[:0], entry.EncodedTags...)
Comment on lines +466 to +467
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will these compete with StreamingRead also mutating these fields?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. At least for now (and most probably in the future) data and metadata reads are mutually exclusive (like it was before the introduction of streaming). This is documented in the method comments on the interface.


r.metadataRead++

return StreamedMetadataEntry{
ID: r.streamingID,
EncodedTags: r.streamingTags,
Length: int(entry.Size),
DataChecksum: uint32(entry.DataChecksum),
}, nil
}

func (r *reader) ReadMetadata() (ident.ID, ident.TagIterator, int, uint32, error) {
if r.streamingEnabled {
return nil, nil, 0, 0, errStreamingUnsupported
Expand Down
47 changes: 32 additions & 15 deletions src/dbnode/persist/fs/read_write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

"github.com/m3db/m3/src/dbnode/digest"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/x/checked"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
Expand Down Expand Up @@ -196,11 +197,6 @@ func readTestDataWithStreamingOpt(
streamingEnabled bool,
) {
for _, underTest := range readTestTypes {
if underTest == readTestTypeMetadata && streamingEnabled {
// ATM there is no streaming support for metadata.
continue
}

rOpenOpts := DataReaderOpenOptions{
Identifier: FileSetFileIdentifier{
Namespace: testNs1ID,
Expand Down Expand Up @@ -263,7 +259,7 @@ func readTestDataWithStreamingOpt(
data.Finalize()

case readTestTypeMetadata:
id, tags, length, checksum, err := r.ReadMetadata()
id, tags, length, checksum, err := readMetadata(t, r)
require.NoError(t, err)

// Assert id
Expand Down Expand Up @@ -571,26 +567,47 @@ func TestWriterOnlyWritesNonNilBytes(t *testing.T) {
})
}

func readData(t *testing.T, reader DataFileSetReader) (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) {
func readData(
t *testing.T,
reader DataFileSetReader,
) (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error) {
if reader.StreamingEnabled() {
entry, err := reader.StreamingRead()
if err != nil {
return nil, nil, nil, 0, err
}
var tags = ident.EmptyTagIterator
if len(entry.EncodedTags) > 0 {
tagsDecoder := testTagDecoderPool.Get()
tagsDecoder.Reset(checkedBytes(entry.EncodedTags))
require.NoError(t, tagsDecoder.Err())
tags = tagsDecoder
}

tags := decodeTags(t, entry.EncodedTags)
return entry.ID, tags, checked.NewBytes(entry.Data, nil), entry.DataChecksum, err
}

return reader.Read()
}

func readMetadata(
t *testing.T,
reader DataFileSetReader,
) (id ident.ID, tags ident.TagIterator, length int, checksum uint32, err error) {
if reader.StreamingEnabled() {
entry, err := reader.StreamingReadMetadata()
tags := decodeTags(t, entry.EncodedTags)
return entry.ID, tags, entry.Length, entry.DataChecksum, err
}

return reader.ReadMetadata()
}

func decodeTags(t *testing.T, encodedTags ts.EncodedTags) ident.TagIterator {
tags := ident.EmptyTagIterator
if len(encodedTags) > 0 {
tagsDecoder := testTagDecoderPool.Get()
tagsDecoder.Reset(checkedBytes(encodedTags))
require.NoError(t, tagsDecoder.Err())
tags = tagsDecoder
}

return tags
}

func checkedBytes(b []byte) checked.Bytes {
r := checked.NewBytes(b, nil)
r.IncRef()
Expand Down
13 changes: 9 additions & 4 deletions src/dbnode/persist/fs/reader_open_options_matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import (

// ReaderOpenOptionsMatcher is a matcher for the DataReaderOpenOptions struct
type ReaderOpenOptionsMatcher struct {
ID FileSetFileIdentifier
FileSetType persist.FileSetType
ID FileSetFileIdentifier
FileSetType persist.FileSetType
StreamingEnabled bool
}

// Matches determine whether m matches a DataWriterOpenOptions
Expand All @@ -54,13 +55,17 @@ func (m ReaderOpenOptionsMatcher) Matches(x interface{}) bool {
if m.FileSetType != readerOpenOptions.FileSetType {
return false
}
if m.StreamingEnabled != readerOpenOptions.StreamingEnabled {
return false
}

return true
}

func (m ReaderOpenOptionsMatcher) String() string {
return fmt.Sprintf(
"namespace: %s, shard: %d, blockstart: %d, volumeIndex: %d, filesetType: %s",
m.ID.Namespace.String(), m.ID.Shard, m.ID.BlockStart.Unix(), m.ID.VolumeIndex, m.FileSetType,
"namespace: %s, shard: %d, blockstart: %d, volumeIndex: %d, filesetType: %s, streamingEnabled: %t", // nolint: lll
m.ID.Namespace.String(), m.ID.Shard, m.ID.BlockStart.Unix(), m.ID.VolumeIndex,
m.FileSetType, m.StreamingEnabled,
)
}
39 changes: 27 additions & 12 deletions src/dbnode/persist/fs/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,9 @@ type DataReaderOpenOptions struct {
Identifier FileSetFileIdentifier
// FileSetType is the file set type.
FileSetType persist.FileSetType
// StreamingEnabled enables using streaming methods, such as DataFileSetReader.StreamingRead.
// StreamingEnabled enables using streaming methods, such as
// DataFileSetReader.StreamingRead and DataFileSetReader.StreamingReadMetadata.
StreamingEnabled bool
// NB(bodu): This option can inform the reader to optimize for reading
// only metadata by not sorting index entries. Setting this option will
// throw an error if a regular `Read()` is attempted.
OptimizedReadMetadataOnly bool
}

// DataFileSetReader provides an unsynchronized reader for a TSDB file set.
Expand All @@ -145,18 +142,27 @@ type DataFileSetReader interface {
// Status returns the status of the reader
Status() DataFileSetReaderStatus

// Read returns the next id, tags, data, checksum tuple or error, will return io.EOF at end of volume.
// Use either Read or ReadMetadata to progress through a volume, but not both.
// Note: make sure to finalize the ID, close the Tags and finalize the Data when done with
// them so they can be returned to their respective pools.
Read() (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error)

// StreamingRead returns the next unpooled id, encodedTags, data, checksum
// values ordered by id, or error, will return io.EOF at end of volume.
// Can only by used when DataReaderOpenOptions.StreamingEnabled is true.
// Use either StreamingRead or StreamingReadMetadata to progress through a volume, but not both.
// Note: the returned data gets invalidated on the next call to StreamingRead.
StreamingRead() (StreamedDataEntry, error)

// StreamingReadMetadata returns the next unpooled id, encodedTags, length checksum
// values ordered by id, or error; will return io.EOF at end of volume.
// Can only by used when DataReaderOpenOptions.StreamingEnabled is true.
// Use either StreamingRead or StreamingReadMetadata to progress through a volume, but not both.
// Note: the returned data get invalidated on the next call to StreamingReadMetadata.
StreamingReadMetadata() (StreamedMetadataEntry, error)

// Read returns the next id, tags, data, checksum tuple or error,
// will return io.EOF at end of volume.
// Use either Read or ReadMetadata to progress through a volume, but not both.
// Note: make sure to finalize the ID, close the Tags and finalize the Data when done with
// them so they can be returned to their respective pools.
Read() (id ident.ID, tags ident.TagIterator, data checked.Bytes, checksum uint32, err error)

// ReadMetadata returns the next id and metadata or error, will return io.EOF at end of volume.
// Use either Read or ReadMetadata to progress through a volume, but not both.
// Note: make sure to finalize the ID, and close the Tags when done with them so they can
Expand Down Expand Up @@ -678,7 +684,7 @@ type IndexClaimsManager interface {
) (int, error)
}

// StreamedDataEntry contains the data of single entry returned by streaming method(s).
// StreamedDataEntry contains the data of single entry returned by streaming method.
// The underlying data slices are reused and invalidated on every read.
type StreamedDataEntry struct {
ID ident.BytesID
Expand All @@ -687,6 +693,15 @@ type StreamedDataEntry struct {
DataChecksum uint32
}

// StreamedMetadataEntry contains the metadata of single entry returned by streaming method.
// The underlying data slices are reused and invalidated on every read.
type StreamedMetadataEntry struct {
ID ident.BytesID
EncodedTags ts.EncodedTags
Length int
DataChecksum uint32
}

// NewReaderFn creates a new DataFileSetReader.
type NewReaderFn func(bytesPool pool.CheckedBytesPool, opts Options) (DataFileSetReader, error)

Expand Down
Loading