Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbnode] Faster M3TSZ decoding by using 64 bit operations #2827

Merged
merged 39 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
2b6e823
BenchmarkM3TSZDecode
linasm Oct 31, 2020
9eed46a
istream64 + benchmark
linasm Nov 1, 2020
1ae4a01
Clean up benchmark code
linasm Nov 2, 2020
b98c2da
Improve tests
linasm Nov 2, 2020
568a0ca
Merge branch 'master' into linasm/m3tsz-performance
linasm Nov 2, 2020
85ad1dd
one less shift
vpranckaitis Nov 9, 2020
b1699b7
Merge branch 'master' into linasm/m3tsz-performance
linasm Nov 28, 2020
3eb6e55
Align with changes in master
linasm Nov 28, 2020
938f244
Replace IStream with IStream64
linasm Nov 29, 2020
c216a2a
git add
linasm Nov 29, 2020
7fcffe7
Merge branch 'master' into linasm/m3tsz-performance
linasm Nov 29, 2020
cc22a35
Fix read_data_files
linasm Nov 29, 2020
e42c761
lint
linasm Nov 29, 2020
4c44fda
TestSegmentReader64
linasm Nov 29, 2020
9d9644e
Merge branch 'master' into linasm/m3tsz-performance
linasm Nov 29, 2020
dd42c21
Fix disk_flush_helpers.go
linasm Nov 29, 2020
41e3e6c
Update benchmark result
linasm Nov 29, 2020
40f1dbb
Drop IStream interface
linasm Nov 29, 2020
1293407
Drop noop XOR with 0
linasm Nov 29, 2020
bd387d2
Fix test condition
linasm Nov 29, 2020
ca83b17
Copyrights
linasm Nov 29, 2020
a01bd71
Fix some more tests
linasm Nov 29, 2020
8530b9e
Address PR feedback
linasm Dec 1, 2020
9f0a12e
Merge branch 'master' into linasm/m3tsz-performance
linasm Dec 1, 2020
c02be87
Merge branch 'master' into linasm/m3tsz-performance
linasm Dec 5, 2020
ffe5e40
Apply some DRY
linasm Dec 5, 2020
6f2f3e3
Revert some changes
linasm Dec 5, 2020
7371c9a
Merge branch 'master' into linasm/m3tsz-performance
linasm Dec 5, 2020
c67dbab
Merge branch 'master' into linasm/m3tsz-performance
linasm Dec 7, 2020
3c41e12
Merge branch 'master' into linasm/m3tsz-performance
linasm Dec 26, 2020
624ab8d
Merge branch 'master' into linasm/m3tsz-performance
linasm Jan 7, 2021
9f1d3c1
Improve segment_reader_test
linasm Jan 9, 2021
ae38638
Merge branch 'linasm/m3tsz-performance' of github.com:m3db/m3 into li…
linasm Jan 9, 2021
ef7bb74
Merge branch 'master' into linasm/m3tsz-performance
linasm Jan 9, 2021
64d3e68
Merge branch 'master' into linasm/m3tsz-performance
linasm Feb 11, 2021
ccbc64c
Address PR feedback
linasm Feb 13, 2021
9172cf9
Merge branch 'master' into linasm/m3tsz-performance
linasm Feb 13, 2021
8b87db8
Only compute headTailLen when needed
linasm Feb 15, 2021
01fdfaf
Merge branch 'master' into linasm/m3tsz-performance
linasm Feb 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,17 @@
package parser

import (
"io"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/ts"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/query/models"
"github.com/m3db/m3/src/x/ident"
xtime "github.com/m3db/m3/src/x/time"
)

const sep rune = '!'
const tagSep rune = '.'

// Data is a set of datapoints.
type Data []ts.Datapoint

Expand All @@ -46,9 +41,7 @@ type IngestSeries struct {
Tags Tags
}

var iterAlloc = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
}
var iterAlloc = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())

func buildBlockReader(
block Data,
Expand Down
4 changes: 2 additions & 2 deletions src/cmd/tools/read_data_files/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package main

import (
"bytes"
"encoding/base64"
"fmt"
"io"
Expand All @@ -35,6 +34,7 @@ import (
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/persist"
"github.com/m3db/m3/src/dbnode/persist/fs"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/ident"

"github.com/pborman/getopt"
Expand Down Expand Up @@ -159,7 +159,7 @@ func main() {
if benchMode != benchmarkSeries {
data.IncRef()

iter := m3tsz.NewReaderIterator(bytes.NewReader(data.Bytes()), true, encodingOpts)
iter := m3tsz.NewReaderIterator(xio.NewBytesReader64(data.Bytes()), true, encodingOpts)
for iter.Next() {
dp, _, annotation := iter.Current()
if benchMode == benchmarkNone {
Expand Down
11 changes: 1 addition & 10 deletions src/dbnode/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ package client
import (
"errors"
"fmt"
"io"
"time"

"github.com/m3db/m3/src/dbnode/encoding"
Expand All @@ -43,11 +42,6 @@ const (
asyncWriteWorkerPoolDefaultSize = 128
)

var (
errConfigurationMustSupplyConfig = errors.New(
"must supply config when no topology initializer parameter supplied")
)

// Configuration is a configuration that can be used to construct a client.
type Configuration struct {
// The environment (static or dynamic) configuration.
Expand Down Expand Up @@ -412,10 +406,7 @@ func (c Configuration) NewAdminClient(
encodingOpts = encoding.NewOptions()
}

v = v.SetReaderIteratorAllocate(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
intOptimized := m3tsz.DefaultIntOptimizationEnabled
return m3tsz.NewReaderIterator(r, intOptimized, encodingOpts)
})
v = v.SetReaderIteratorAllocate(m3tsz.DefaultReaderIteratorAllocFn(encodingOpts))

if c.Proto != nil && c.Proto.Enabled {
v = v.SetEncodingProto(encodingOpts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package client

import (
"fmt"
"io"
"math/rand"
"os"
"sort"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/m3db/m3/src/dbnode/encoding"
"github.com/m3db/m3/src/dbnode/encoding/m3tsz"
"github.com/m3db/m3/src/dbnode/generated/thrift/rpc"
"github.com/m3db/m3/src/dbnode/namespace"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/x/xpool"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -272,9 +270,7 @@ func initTestFetchTaggedPools() *testFetchTaggedPools {
pools.readerSlices.Init()

pools.multiReader = encoding.NewMultiReaderIteratorPool(opts)
pools.multiReader.Init(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
})
pools.multiReader.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()))

pools.seriesIter = encoding.NewSeriesIteratorPool(opts)
pools.seriesIter.Init()
Expand Down
11 changes: 6 additions & 5 deletions src/dbnode/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package client

import (
"errors"
"io"
"math"
"runtime"
"time"
Expand All @@ -37,6 +36,7 @@ import (
m3dbruntime "github.com/m3db/m3/src/dbnode/runtime"
"github.com/m3db/m3/src/dbnode/storage/index"
"github.com/m3db/m3/src/dbnode/topology"
"github.com/m3db/m3/src/dbnode/x/xio"
"github.com/m3db/m3/src/x/clock"
"github.com/m3db/m3/src/x/context"
"github.com/m3db/m3/src/x/ident"
Expand Down Expand Up @@ -473,16 +473,17 @@ func (o *options) Validate() error {

func (o *options) SetEncodingM3TSZ() Options {
opts := *o
opts.readerIteratorAllocate = func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
}
opts.readerIteratorAllocate = m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions())
opts.isProtoEnabled = false
return &opts
}

func (o *options) SetEncodingProto(encodingOpts encoding.Options) Options {
opts := *o
opts.readerIteratorAllocate = func(r io.Reader, descr namespace.SchemaDescr) encoding.ReaderIterator {
opts.readerIteratorAllocate = func(
r xio.Reader64,
descr namespace.SchemaDescr,
) encoding.ReaderIterator {
return proto.NewIterator(r, descr, encodingOpts)
}
opts.isProtoEnabled = true
Expand Down
59 changes: 28 additions & 31 deletions src/dbnode/client/session_fetch_bulk_blocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"bytes"
"fmt"
"io"
"io/ioutil"
"math"
"sort"
"sync"
Expand Down Expand Up @@ -58,17 +57,19 @@ import (
)

var (
blockSize = 2 * time.Hour
nsID = ident.StringID("testNs1")
nsRetentionOpts = retention.NewOptions().
SetBlockSize(blockSize).
SetRetentionPeriod(48 * blockSize)
blockSize = 2 * time.Hour
nsID = ident.StringID("testNs1")

nsRetentionOpts = retention.NewOptions().SetBlockSize(blockSize).SetRetentionPeriod(48 * blockSize)

testTagDecodingPool = serialize.NewTagDecoderPool(
serialize.NewTagDecoderOptions(serialize.TagDecoderOptionsConfig{}),
pool.NewObjectPoolOptions().SetSize(1))

testTagEncodingPool = serialize.NewTagEncoderPool(
serialize.NewTagEncoderOptions(),
pool.NewObjectPoolOptions().SetSize(1))

testIDPool = newSessionTestOptions().IdentifierPool()
fooID = ident.StringID("foo")
fooTags checked.Bytes
Expand Down Expand Up @@ -101,9 +102,7 @@ func testsNsMetadata(t *testing.T) namespace.Metadata {

func newSessionTestMultiReaderIteratorPool() encoding.MultiReaderIteratorPool {
p := encoding.NewMultiReaderIteratorPool(nil)
p.Init(func(r io.Reader, _ namespace.SchemaDescr) encoding.ReaderIterator {
return m3tsz.NewReaderIterator(r, m3tsz.DefaultIntOptimizationEnabled, encoding.NewOptions())
})
p.Init(m3tsz.DefaultReaderIteratorAllocFn(encoding.NewOptions()))
return p
}

Expand Down Expand Up @@ -1455,10 +1454,9 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockErr(t *testing.T) {
require.True(t, ok)
segment, err := reader.Segment()
require.NoError(t, err)
rawBlockData := make([]byte, segment.Len())
n, err := reader.Read(rawBlockData)
require.NoError(t, err)
require.Equal(t, len(rawBlockData), n)
rawBlockData, err := xio.ToBytes(reader)
require.Equal(t, io.EOF, err)
require.Equal(t, len(rawBlockData), segment.Len())
rawBlockLen := int64(len(rawBlockData))

var (
Expand Down Expand Up @@ -1510,25 +1508,25 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockErr(t *testing.T) {
Return(&rpc.FetchBlocksRawResult_{
Elements: []*rpc.Blocks{
// First foo block intact
&rpc.Blocks{ID: []byte("foo"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.UnixNano(), Segments: &rpc.Segments{
{ID: []byte("foo"), Blocks: []*rpc.Block{
{Start: start.UnixNano(), Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: rawBlockData[:len(rawBlockData)-1],
Tail: []byte{rawBlockData[len(rawBlockData)-1]},
},
}},
}},
// First bar block intact, second with error
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.UnixNano(), Segments: &rpc.Segments{
{ID: []byte("bar"), Blocks: []*rpc.Block{
{Start: start.UnixNano(), Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: rawBlockData[:len(rawBlockData)-1],
Tail: []byte{rawBlockData[len(rawBlockData)-1]},
},
}},
}},
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.Add(blockSize).UnixNano(), Err: &rpc.Error{
{ID: []byte("bar"), Blocks: []*rpc.Block{
{Start: start.Add(blockSize).UnixNano(), Err: &rpc.Error{
Type: rpc.ErrorType_INTERNAL_ERROR,
Message: "an error",
}},
Expand Down Expand Up @@ -1606,10 +1604,9 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockChecksum(t *testing.T) {
require.True(t, ok)
segment, err := reader.Segment()
require.NoError(t, err)
rawBlockData := make([]byte, segment.Len())
n, err := reader.Read(rawBlockData)
require.NoError(t, err)
require.Equal(t, len(rawBlockData), n)
rawBlockData, err := xio.ToBytes(reader)
require.Equal(t, io.EOF, err)
require.Equal(t, len(rawBlockData), segment.Len())
rawBlockLen := int64(len(rawBlockData))

var (
Expand Down Expand Up @@ -1666,26 +1663,26 @@ func TestStreamBlocksBatchFromPeerVerifiesBlockChecksum(t *testing.T) {
Return(&rpc.FetchBlocksRawResult_{
Elements: []*rpc.Blocks{
// valid foo block
&rpc.Blocks{ID: []byte("foo"), Blocks: []*rpc.Block{
&rpc.Block{Start: start.UnixNano(), Checksum: &validChecksum, Segments: &rpc.Segments{
{ID: []byte("foo"), Blocks: []*rpc.Block{
{Start: start.UnixNano(), Checksum: &validChecksum, Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: head,
Tail: tail,
},
}},
}},
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
{ID: []byte("bar"), Blocks: []*rpc.Block{
// invalid bar block
&rpc.Block{Start: start.UnixNano(), Checksum: &invalidChecksum, Segments: &rpc.Segments{
{Start: start.UnixNano(), Checksum: &invalidChecksum, Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: head,
Tail: tail,
},
}},
}},
&rpc.Blocks{ID: []byte("bar"), Blocks: []*rpc.Block{
{ID: []byte("bar"), Blocks: []*rpc.Block{
// valid bar block, no checksum
&rpc.Block{Start: start.Add(blockSize).UnixNano(), Segments: &rpc.Segments{
{Start: start.Add(blockSize).UnixNano(), Segments: &rpc.Segments{
Merged: &rpc.Segment{
Head: head,
Tail: tail,
Expand Down Expand Up @@ -1769,8 +1766,8 @@ func TestBlocksResultAddBlockFromPeerReadMerged(t *testing.T) {
require.NoError(t, err)

// Assert block has data
data, err := ioutil.ReadAll(xio.NewSegmentReader(seg))
require.NoError(t, err)
data, err := xio.ToBytes(xio.NewSegmentReader(seg))
require.Equal(t, io.EOF, err)
assert.Equal(t, []byte{1, 2, 3}, data)
}

Expand Down
2 changes: 1 addition & 1 deletion src/dbnode/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func LeadingAndTrailingZeros(v uint64) (int, int) {
}

// SignExtend sign extends the highest bit of v which has numBits (<=64).
func SignExtend(v uint64, numBits uint) int64 {
func SignExtend(v uint64, numBits uint8) int64 {
shift := 64 - numBits
return (int64(v) << shift) >> shift
}
Loading