Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Decode ReadBits improvements #2197

Merged
merged 21 commits into from
Mar 11, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/dbnode/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func LeadingAndTrailingZeros(v uint64) (int, int) {
}

// SignExtend sign extends the highest bit of v which has numBits (<=64)
func SignExtend(v uint64, numBits int) int64 {
shift := uint(64 - numBits)
func SignExtend(v uint64, numBits uint) int64 {
shift := 64 - numBits
return (int64(v) << shift) >> shift
}
40 changes: 13 additions & 27 deletions src/dbnode/encoding/istream.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,8 @@ import (
// istream encapsulates a readable stream.
type istream struct {
r *bufio.Reader // encoded stream
err error // error encountered
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we removing the error? Shouldn't ReadBit still fail if the stream has failed?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error is set from the result of readByteFromStream, but in all of the cases we call this, the parent call also returns that error (e.g. ReadBit). So I see no reason to keep the error stored as state on the stream object itself if we already would have returned it. Is there a reason to keep it, though, that I'm missing? The value of removing it is that we can then remove these if-conditions that are in multiple methods to check if the error is set.

Copy link
Collaborator

@arnikola arnikola Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a noticeable difference on flame graphs? Branch prediction should help reduce the cost of these when there's no error

Mostly concerned about losing some future proofing if the actual impact turns out to be very small

Copy link
Collaborator Author

@rallen090 rallen090 Mar 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it is small but noticeable on the flamegraph. Most of ReadBits is attributable to readByte, and that increases when we remove this err checks (meaning less CPU work in the ReadBits func, as you can see on the far right of the ReadBits in the graphs).

BEFORE (readByte is 10.62%)
image

AFTER (readByte is 13.31%)
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd still lean on the side of caution here; otherwise there's a bit of a weird reliance that is.r.Byte() will error in the expected fashion otherwise, and if the underlying reader changes you may get weird behaviour

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok sounds good to me. I'll update to only include the casting changes.

current byte // current byte we are working off of
remaining int // bits remaining in current to be read
remaining uint // bits remaining in current to be read
}

// NewIStream creates a new Istream
Expand All @@ -41,9 +40,6 @@ func NewIStream(reader io.Reader, bufioSize int) IStream {

// ReadBit reads the next Bit
func (is *istream) ReadBit() (Bit, error) {
if is.err != nil {
return 0, is.err
}
if is.remaining == 0 {
if err := is.readByteFromStream(); err != nil {
return 0, err
Expand Down Expand Up @@ -77,9 +73,6 @@ func (is *istream) Read(b []byte) (int, error) {

// ReadByte reads the next Byte
func (is *istream) ReadByte() (byte, error) {
if is.err != nil {
return 0, is.err
}
remaining := is.remaining
res := is.consumeBuffer(remaining)
if remaining == 8 {
Expand All @@ -93,11 +86,7 @@ func (is *istream) ReadByte() (byte, error) {
}

// ReadBits reads the next Bits
func (is *istream) ReadBits(numBits int) (uint64, error) {
if is.err != nil {
return 0, is.err
}

func (is *istream) ReadBits(numBits uint) (uint64, error) {
var res uint64
for numBits >= 8 {
byteRead, err := is.ReadByte()
Expand All @@ -121,7 +110,7 @@ func (is *istream) ReadBits(numBits int) (uint64, error) {
if is.remaining < numToRead {
numToRead = is.remaining
}
bits := is.current >> uint(8-numToRead)
bits := is.current >> (8 - numToRead)
is.current <<= uint(numToRead)
is.remaining -= numToRead
res = (res << uint64(numToRead)) | uint64(bits)
Expand All @@ -131,10 +120,7 @@ func (is *istream) ReadBits(numBits int) (uint64, error) {
}

// PeekBits looks at the next Bits, but doesn't move the pos
func (is *istream) PeekBits(numBits int) (uint64, error) {
if is.err != nil {
return 0, is.err
}
func (is *istream) PeekBits(numBits uint) (uint64, error) {
// check the last byte first
if numBits <= is.remaining {
return uint64(readBitsInByte(is.current, numBits)), nil
Expand All @@ -152,39 +138,39 @@ func (is *istream) PeekBits(numBits int) (uint64, error) {
numBitsRead += 8
}
remainder := readBitsInByte(bytesRead[numBytesToRead-1], numBits-numBitsRead)
res = (res << uint(numBits-numBitsRead)) | uint64(remainder)
res = (res << (numBits - numBitsRead)) | uint64(remainder)
return res, nil
}

// RemainingBitsInCurrentByte returns the number of bits remaining to be read in
// the current byte.
func (is *istream) RemainingBitsInCurrentByte() int {
func (is *istream) RemainingBitsInCurrentByte() uint {
return is.remaining
}

// readBitsInByte reads numBits in byte b.
func readBitsInByte(b byte, numBits int) byte {
return b >> uint(8-numBits)
func readBitsInByte(b byte, numBits uint) byte {
return b >> (8 - numBits)
}

// consumeBuffer consumes numBits in is.current.
func (is *istream) consumeBuffer(numBits int) byte {
func (is *istream) consumeBuffer(numBits uint) byte {
res := readBitsInByte(is.current, numBits)
is.current <<= uint(numBits)
is.current <<= numBits
is.remaining -= numBits
return res
}

func (is *istream) readByteFromStream() error {
is.current, is.err = is.r.ReadByte()
var err error
is.current, err = is.r.ReadByte()
is.remaining = 8
return is.err
return err
}

// Reset resets the Istream
func (is *istream) Reset(r io.Reader) {
is.r.Reset(r)
is.err = nil
is.current = 0
is.remaining = 0
}
11 changes: 3 additions & 8 deletions src/dbnode/encoding/istream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestReadBits(t *testing.T) {

o := NewIStream(bytes.NewReader(byteStream), 16)
is := o.(*istream)
numBits := []int{1, 3, 4, 8, 7, 2, 64, 64}
numBits := []uint{1, 3, 4, 8, 7, 2, 64, 64}
var res []uint64
for _, v := range numBits {
read, err := is.ReadBits(v)
Expand All @@ -44,19 +44,17 @@ func TestReadBits(t *testing.T) {
}
expected := []uint64{0x1, 0x4, 0xa, 0xfe, 0x7e, 0x3, 0x1234567890abcdef, 0x1}
require.Equal(t, expected, res)
require.NoError(t, is.err)

_, err := is.ReadBits(8)
require.Error(t, err)
require.Error(t, is.err)
}

func TestPeekBitsSuccess(t *testing.T) {
byteStream := []byte{0xa9, 0xfe, 0xfe, 0xdf, 0x9b, 0x57, 0x21, 0xf1}
o := NewIStream(bytes.NewReader(byteStream), 16)
is := o.(*istream)
inputs := []struct {
numBits int
numBits uint
expected uint64
}{
{0, 0},
Expand All @@ -73,7 +71,6 @@ func TestPeekBitsSuccess(t *testing.T) {
require.NoError(t, err)
require.Equal(t, input.expected, res)
}
require.NoError(t, is.err)
require.Equal(t, byte(0), is.current)
require.Equal(t, 0, is.remaining)
}
Expand All @@ -98,7 +95,7 @@ func TestReadAfterPeekBits(t *testing.T) {
require.Error(t, err)

inputs := []struct {
numBits int
numBits uint
expected uint64
}{
{2, 0x2},
Expand All @@ -117,9 +114,7 @@ func TestResetIStream(t *testing.T) {
o := NewIStream(bytes.NewReader(nil), 16)
is := o.(*istream)
is.ReadBits(1)
require.Error(t, is.err)
is.Reset(bytes.NewReader(nil))
require.NoError(t, is.err)
require.Equal(t, byte(0), is.current)
require.Equal(t, 0, is.remaining)
}
4 changes: 2 additions & 2 deletions src/dbnode/encoding/m3tsz/float_encoder_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error
cb = (cb << 1) | nextCB
if cb == opcodeContainedValueXOR {
previousLeading, previousTrailing := encoding.LeadingAndTrailingZeros(eit.PrevXOR)
numMeaningfulBits := 64 - previousLeading - previousTrailing
numMeaningfulBits := uint(64 - previousLeading - previousTrailing)
meaningfulBits, err := stream.ReadBits(numMeaningfulBits)
if err != nil {
return err
Expand All @@ -153,7 +153,7 @@ func (eit *FloatEncoderAndIterator) readNextFloat(stream encoding.IStream) error
numLeadingZeros := (numLeadingZeroesAndNumMeaningfulBits & bits12To6Mask) >> 6
numMeaningfulBits := (numLeadingZeroesAndNumMeaningfulBits & bits6To0Mask) + 1

meaningfulBits, err := stream.ReadBits(int(numMeaningfulBits))
meaningfulBits, err := stream.ReadBits(uint(numMeaningfulBits))
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions src/dbnode/encoding/m3tsz/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func (it *readerIterator) readIntValDiff() {
sign = 1.0
}

it.intVal += sign * float64(it.readBits(int(it.sig)))
it.intVal += sign * float64(it.readBits(uint(it.sig)))
}

func (it *readerIterator) readBits(numBits int) uint64 {
func (it *readerIterator) readBits(numBits uint) uint64 {
if !it.hasNext() {
return 0
}
Expand Down
44 changes: 26 additions & 18 deletions src/dbnode/encoding/m3tsz/timestamp_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,23 @@ type TimestampIterator struct {
// schemes. Setting SkipMarkers to true disables the look ahead behavior
// for situations where looking ahead is not safe.
SkipMarkers bool

numOpcodeBits int
numValueBits int
numBits uint
markerEncodingScheme encoding.MarkerEncodingScheme
}

// NewTimestampIterator creates a new TimestampIterator.
func NewTimestampIterator(opts encoding.Options, skipMarkers bool) TimestampIterator {
mes := opts.MarkerEncodingScheme()
return TimestampIterator{
Opts: opts,
SkipMarkers: skipMarkers,
Opts: opts,
SkipMarkers: skipMarkers,
numValueBits: mes.NumValueBits(),
numOpcodeBits: mes.NumOpcodeBits(),
numBits: uint(mes.NumOpcodeBits() + mes.NumValueBits()),
markerEncodingScheme: mes,
}
}

Expand Down Expand Up @@ -138,32 +148,30 @@ func (it *TimestampIterator) readNextTimestamp(stream encoding.IStream) error {
}

func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Duration, bool, error) {
mes := it.Opts.MarkerEncodingScheme()
numBits := mes.NumOpcodeBits() + mes.NumValueBits()
opcodeAndValue, success := it.tryPeekBits(stream, numBits)
opcodeAndValue, success := it.tryPeekBits(stream, it.numBits)
if !success {
return 0, false, nil
}

opcode := opcodeAndValue >> uint(mes.NumValueBits())
if opcode != mes.Opcode() {
opcode := opcodeAndValue >> uint(it.numValueBits)
if opcode != it.markerEncodingScheme.Opcode() {
return 0, false, nil
}

var (
valueMask = (1 << uint(mes.NumValueBits())) - 1
valueMask = (1 << uint(it.numValueBits)) - 1
markerValue = int64(opcodeAndValue & uint64(valueMask))
)
switch encoding.Marker(markerValue) {
case mes.EndOfStream():
_, err := stream.ReadBits(numBits)
case it.markerEncodingScheme.EndOfStream():
_, err := stream.ReadBits(it.numBits)
if err != nil {
return 0, false, err
}
it.Done = true
return 0, true, nil
case mes.Annotation():
_, err := stream.ReadBits(numBits)
case it.markerEncodingScheme.Annotation():
_, err := stream.ReadBits(it.numBits)
if err != nil {
return 0, false, err
}
Expand All @@ -176,8 +184,8 @@ func (it *TimestampIterator) tryReadMarker(stream encoding.IStream) (time.Durati
return 0, false, err
}
return markerOrDOD, true, nil
case mes.TimeUnit():
_, err := stream.ReadBits(numBits)
case it.markerEncodingScheme.TimeUnit():
_, err := stream.ReadBits(it.numBits)
if err != nil {
return 0, false, err
}
Expand Down Expand Up @@ -249,12 +257,12 @@ func (it *TimestampIterator) readDeltaOfDelta(

cb = (cb << 1) | nextCB
if cb == buckets[i].Opcode() {
dodBits, err := stream.ReadBits(buckets[i].NumValueBits())
dodBits, err := stream.ReadBits(uint(buckets[i].NumValueBits()))
if err != nil {
return 0, err
}

dod := encoding.SignExtend(dodBits, buckets[i].NumValueBits())
dod := encoding.SignExtend(dodBits, uint(buckets[i].NumValueBits()))
timeUnit, err := it.TimeUnit.Value()
if err != nil {
return 0, nil
Expand All @@ -264,7 +272,7 @@ func (it *TimestampIterator) readDeltaOfDelta(
}
}

numValueBits := tes.DefaultBucket().NumValueBits()
numValueBits := uint(tes.DefaultBucket().NumValueBits())
dodBits, err := stream.ReadBits(numValueBits)
if err != nil {
return 0, err
Expand Down Expand Up @@ -312,7 +320,7 @@ func (it *TimestampIterator) readVarint(stream encoding.IStream) (int, error) {
return int(res), err
}

func (it *TimestampIterator) tryPeekBits(stream encoding.IStream, numBits int) (uint64, bool) {
func (it *TimestampIterator) tryPeekBits(stream encoding.IStream, numBits uint) (uint64, bool) {
res, err := stream.PeekBits(numBits)
if err != nil {
return 0, false
Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
const (
defaultDefaultTimeUnit = xtime.Second
defaultByteFieldDictLRUSize = 4
defaultIStreamReaderSizeM3TSZ = 16
defaultIStreamReaderSizeM3TSZ = 8 * 2
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also FYI I was playing around with this setting since it drives how large a buffer we keep in this stream. Seems like it does have some effects on the flamegraphs where the larger buffer avoids time going to the buffer fill as often. But the runtimes still seemed pretty variable so it didn't seem like a change we definitely want to make.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, yeah this might be worth playing around with - potentially in a way that only effects query instead of definitely dbnode as well.

defaultIStreamReaderSizeProto = 128
)

Expand Down
6 changes: 3 additions & 3 deletions src/dbnode/encoding/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,9 +315,9 @@ type IStream interface {
Read([]byte) (int, error)
ReadBit() (Bit, error)
ReadByte() (byte, error)
ReadBits(numBits int) (uint64, error)
PeekBits(numBits int) (uint64, error)
RemainingBitsInCurrentByte() int
ReadBits(numBits uint) (uint64, error)
PeekBits(numBits uint) (uint64, error)
RemainingBitsInCurrentByte() uint
Reset(r io.Reader)
}

Expand Down
2 changes: 1 addition & 1 deletion src/query/ts/m3db/encoded_step_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func benchmarkNextIteration(b *testing.B, iterations int, t iterType) {

if v := profilesTaken[key]; v == 2 {
p := profile.Start(profile.MemProfile)
defer p.Stop()
p.Stop()
rallen090 marked this conversation as resolved.
Show resolved Hide resolved
}

profilesTaken[key] = profilesTaken[key] + 1
Expand Down