Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace integer compression in UID Pack with groupvariant alogorithm in Assembly #3527

Merged
merged 7 commits into from
Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 66 additions & 29 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ package codec

import (
"bytes"
"encoding/binary"
"math"
"sort"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-groupvarint"
)

type seekPos int
Expand All @@ -35,6 +34,10 @@ const (
SeekCurrent
)

var (
bitMask uint64 = 0xffffffff00000000
)

// Encoder is used to convert a list of UIDs into a pb.UidPack object.
type Encoder struct {
BlockSize int
Expand All @@ -46,18 +49,35 @@ func (e *Encoder) packBlock() {
if len(e.uids) == 0 {
return
}
block := &pb.UidBlock{Base: e.uids[0]}
block := &pb.UidBlock{Base: e.uids[0], NumUids: uint32(len(e.uids))}
last := e.uids[0]
e.uids = e.uids[1:]

count := 1
var out bytes.Buffer
var buf [binary.MaxVarintLen64]byte
for _, uid := range e.uids[1:] {
n := binary.PutUvarint(buf[:], uid-last)
x.Check2(out.Write(buf[:n]))
last = uid
count++
buf := make([]byte, 17)
tmpUids := make([]uint32, 4)
for {
for i := 0; i < 4; i++ {
if i >= len(e.uids) {
// Padding with '0' because Encode4 encodes only in batch of 4.
tmpUids[i] = 0
} else {
tmpUids[i] = uint32(e.uids[i] - last)
last = e.uids[i]
}
}

data := groupvarint.Encode4(buf, tmpUids)
out.Write(data)

// e.uids has ended and we have padded tmpUids with 0s
if len(e.uids) <= 4 {
e.uids = e.uids[:0]
break
}
e.uids = e.uids[4:]
}

block.Deltas = out.Bytes()
e.pack.Blocks = append(e.pack.Blocks, block)
}
Expand All @@ -67,6 +87,13 @@ func (e *Encoder) Add(uid uint64) {
if e.pack == nil {
e.pack = &pb.UidPack{BlockSize: uint32(e.BlockSize)}
}

size := len(e.uids)
if size > 0 && !match32MSB(e.uids[size-1], uid) {
e.packBlock()
e.uids = e.uids[:0]
}

e.uids = append(e.uids, uid)
if len(e.uids) >= e.BlockSize {
e.packBlock()
Expand Down Expand Up @@ -102,16 +129,28 @@ func (d *Decoder) unpackBlock() []uint64 {
last := block.Base
d.uids = append(d.uids, last)

// Read back the encoded varints.
var offset int
for offset < len(block.Deltas) {
delta, n := binary.Uvarint(block.Deltas[offset:])
x.AssertTrue(n > 0)
offset += n
uid := last + delta
d.uids = append(d.uids, uid)
last = uid
tmpUids := make([]uint32, 4)
var sum uint64
encData := block.Deltas

for uint32(len(d.uids)) < block.NumUids {
if len(encData) < 17 {
// Decode4 decodes 4 uids from encData. It moves slice(encData) forward while
// decoding and expects it to be of length >= 4 at all the stages. Padding
// with zero to make sure lenght is always >= 4.
encData = append(encData, 0, 0, 0)
}

groupvarint.Decode4(tmpUids, encData)
encData = encData[groupvarint.BytesUsed[encData[0]]:]
for i := 0; i < 4; i++ {
sum = last + uint64(tmpUids[i])
d.uids = append(d.uids, sum)
last = sum
}
}

d.uids = d.uids[:block.NumUids]
return d.uids
}

Expand Down Expand Up @@ -245,7 +284,7 @@ func Encode(uids []uint64, blockSize int) *pb.UidPack {
return enc.Done()
}

// ApproxLen returns an approximation of the number of UIDs in the pack. Can be used for int slice
// ApproxLen would indicate the total number of UIDs in the pack. Can be used for int slice
// allocations.
func ApproxLen(pack *pb.UidPack) int {
if pack == nil {
Expand All @@ -264,16 +303,10 @@ func ExactLen(pack *pb.UidPack) int {
if sz == 0 {
return 0
}
block := pack.Blocks[sz-1]
num := 1 // Count the base.
for _, b := range block.Deltas {
// If the MSB in varint encoding is zero, then it is the final byte, not a continuation of
// the integer. Thus, we can count it as one delta.
if b&0x80 == 0 {
num++
}
num := 0
for _, b := range pack.Blocks {
num += int(b.NumUids) // NumUids includes the base UID.
}
num += (sz - 1) * int(pack.BlockSize)
return num
}

Expand All @@ -288,3 +321,7 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {
}
return uids
}

func match32MSB(num1, num2 uint64) bool {
return (num1 & bitMask) == (num2 & bitMask)
}
41 changes: 38 additions & 3 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/binary"
"math"
"math/rand"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -59,9 +60,6 @@ func TestUidPack(t *testing.T) {

expected := getUids(size)
pack := Encode(expected, 256)
for _, block := range pack.Blocks {
require.True(t, len(block.Deltas) <= 255)
}
require.Equal(t, len(expected), ExactLen(pack))
actual := Decode(pack, 0)
require.Equal(t, expected, actual)
Expand Down Expand Up @@ -108,6 +106,12 @@ func TestSeek(t *testing.T) {
require.Equal(t, tc.out, uids[0])
}
}

dec.blockIdx = 0
for i := 100; i < 10000; i += 100 {
uids := dec.LinearSeek(uint64(i))
require.Contains(t, uids, uint64(i))
}
}

func TestLinearSeek(t *testing.T) {
Expand Down Expand Up @@ -249,3 +253,34 @@ func benchmarkUidPackDecode(b *testing.B, blockSize int) {
_ = Decode(pack, 0)
}
}

func TestEncoding(t *testing.T) {
bigInts := make([]uint64, 5)
bigInts[0] = 0xf000000000000000
bigInts[1] = 0xf00f000000000000
bigInts[2] = 0x00f00f0000000000
bigInts[3] = 0x000f0f0000000000
bigInts[4] = 0x0f0f0f0f00000000

rand.Seed(time.Now().UnixNano())
var lengths = []int{0, 1, 2, 3, 5, 13, 18, 100, 99, 98}

for tc := 0; tc < len(lengths); tc++ {
ints := make([]uint64, lengths[tc])

for i := 0; i < 50 && i < lengths[tc]; i++ {
ints[i] = uint64(rand.Uint32())
}

for i := 50; i < lengths[tc]; i++ {
ints[i] = uint64(rand.Uint32()) + bigInts[rand.Intn(5)]
}

sort.Slice(ints, func(i, j int) bool { return ints[i] < ints[j] })

encodedInts := Encode(ints, 256)
decodedInts := Decode(encodedInts, 0)

require.Equal(t, ints, decodedInts)
}
}
12 changes: 12 additions & 0 deletions codec/decoderversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// +build !noasm
// +build amd64

package codec

import (
"github.com/golang/glog"
)

func init() {
glog.Infof("[Decoder]: Using assembly version of decoder")
}
5 changes: 5 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ message UidBlock {
// because when the PB is brought to memory, Go would always use 8-bytes per integer. Instead,
// storing it as a byte slice is a lot cheaper in memory.
bytes deltas = 2;
// num_uids is the number of UIDs in the block. We are including this because we want to
// swtich encoding to groupvarint encoding. Current avaialble open source version implements
// encoding and decoding for uint32. We want to wrap it around our logic to use it here.
// Default Blocksize is 256 so uint32 would be sufficient.
uint32 num_uids = 3;
}

message UidPack {
Expand Down
Loading