Skip to content

Commit

Permalink
Replace integer compression in UID Pack with groupvariant alogorithm (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
animesh2049 authored Jul 4, 2019
1 parent 727b246 commit 6665444
Show file tree
Hide file tree
Showing 15 changed files with 833 additions and 264 deletions.
95 changes: 66 additions & 29 deletions codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,11 @@ package codec

import (
"bytes"
"encoding/binary"
"math"
"sort"

"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/dgryski/go-groupvarint"
)

type seekPos int
Expand All @@ -35,6 +34,10 @@ const (
SeekCurrent
)

var (
bitMask uint64 = 0xffffffff00000000
)

// Encoder is used to convert a list of UIDs into a pb.UidPack object.
type Encoder struct {
BlockSize int
Expand All @@ -46,18 +49,35 @@ func (e *Encoder) packBlock() {
if len(e.uids) == 0 {
return
}
block := &pb.UidBlock{Base: e.uids[0]}
block := &pb.UidBlock{Base: e.uids[0], NumUids: uint32(len(e.uids))}
last := e.uids[0]
e.uids = e.uids[1:]

count := 1
var out bytes.Buffer
var buf [binary.MaxVarintLen64]byte
for _, uid := range e.uids[1:] {
n := binary.PutUvarint(buf[:], uid-last)
x.Check2(out.Write(buf[:n]))
last = uid
count++
buf := make([]byte, 17)
tmpUids := make([]uint32, 4)
for {
for i := 0; i < 4; i++ {
if i >= len(e.uids) {
// Padding with '0' because Encode4 encodes only in batch of 4.
tmpUids[i] = 0
} else {
tmpUids[i] = uint32(e.uids[i] - last)
last = e.uids[i]
}
}

data := groupvarint.Encode4(buf, tmpUids)
out.Write(data)

// e.uids has ended and we have padded tmpUids with 0s
if len(e.uids) <= 4 {
e.uids = e.uids[:0]
break
}
e.uids = e.uids[4:]
}

block.Deltas = out.Bytes()
e.pack.Blocks = append(e.pack.Blocks, block)
}
Expand All @@ -67,6 +87,13 @@ func (e *Encoder) Add(uid uint64) {
if e.pack == nil {
e.pack = &pb.UidPack{BlockSize: uint32(e.BlockSize)}
}

size := len(e.uids)
if size > 0 && !match32MSB(e.uids[size-1], uid) {
e.packBlock()
e.uids = e.uids[:0]
}

e.uids = append(e.uids, uid)
if len(e.uids) >= e.BlockSize {
e.packBlock()
Expand Down Expand Up @@ -102,16 +129,28 @@ func (d *Decoder) unpackBlock() []uint64 {
last := block.Base
d.uids = append(d.uids, last)

// Read back the encoded varints.
var offset int
for offset < len(block.Deltas) {
delta, n := binary.Uvarint(block.Deltas[offset:])
x.AssertTrue(n > 0)
offset += n
uid := last + delta
d.uids = append(d.uids, uid)
last = uid
tmpUids := make([]uint32, 4)
var sum uint64
encData := block.Deltas

for uint32(len(d.uids)) < block.NumUids {
if len(encData) < 17 {
// Decode4 decodes 4 uids from encData. It moves slice(encData) forward while
// decoding and expects it to be of length >= 4 at all the stages. Padding
// with zero to make sure lenght is always >= 4.
encData = append(encData, 0, 0, 0)
}

groupvarint.Decode4(tmpUids, encData)
encData = encData[groupvarint.BytesUsed[encData[0]]:]
for i := 0; i < 4; i++ {
sum = last + uint64(tmpUids[i])
d.uids = append(d.uids, sum)
last = sum
}
}

d.uids = d.uids[:block.NumUids]
return d.uids
}

Expand Down Expand Up @@ -245,7 +284,7 @@ func Encode(uids []uint64, blockSize int) *pb.UidPack {
return enc.Done()
}

// ApproxLen returns an approximation of the number of UIDs in the pack. Can be used for int slice
// ApproxLen would indicate the total number of UIDs in the pack. Can be used for int slice
// allocations.
func ApproxLen(pack *pb.UidPack) int {
if pack == nil {
Expand All @@ -264,16 +303,10 @@ func ExactLen(pack *pb.UidPack) int {
if sz == 0 {
return 0
}
block := pack.Blocks[sz-1]
num := 1 // Count the base.
for _, b := range block.Deltas {
// If the MSB in varint encoding is zero, then it is the final byte, not a continuation of
// the integer. Thus, we can count it as one delta.
if b&0x80 == 0 {
num++
}
num := 0
for _, b := range pack.Blocks {
num += int(b.NumUids) // NumUids includes the base UID.
}
num += (sz - 1) * int(pack.BlockSize)
return num
}

Expand All @@ -288,3 +321,7 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {
}
return uids
}

func match32MSB(num1, num2 uint64) bool {
return (num1 & bitMask) == (num2 & bitMask)
}
41 changes: 38 additions & 3 deletions codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/binary"
"math"
"math/rand"
"sort"
"testing"
"time"

Expand Down Expand Up @@ -59,9 +60,6 @@ func TestUidPack(t *testing.T) {

expected := getUids(size)
pack := Encode(expected, 256)
for _, block := range pack.Blocks {
require.True(t, len(block.Deltas) <= 255)
}
require.Equal(t, len(expected), ExactLen(pack))
actual := Decode(pack, 0)
require.Equal(t, expected, actual)
Expand Down Expand Up @@ -108,6 +106,12 @@ func TestSeek(t *testing.T) {
require.Equal(t, tc.out, uids[0])
}
}

dec.blockIdx = 0
for i := 100; i < 10000; i += 100 {
uids := dec.LinearSeek(uint64(i))
require.Contains(t, uids, uint64(i))
}
}

func TestLinearSeek(t *testing.T) {
Expand Down Expand Up @@ -249,3 +253,34 @@ func benchmarkUidPackDecode(b *testing.B, blockSize int) {
_ = Decode(pack, 0)
}
}

func TestEncoding(t *testing.T) {
bigInts := make([]uint64, 5)
bigInts[0] = 0xf000000000000000
bigInts[1] = 0xf00f000000000000
bigInts[2] = 0x00f00f0000000000
bigInts[3] = 0x000f0f0000000000
bigInts[4] = 0x0f0f0f0f00000000

rand.Seed(time.Now().UnixNano())
var lengths = []int{0, 1, 2, 3, 5, 13, 18, 100, 99, 98}

for tc := 0; tc < len(lengths); tc++ {
ints := make([]uint64, lengths[tc])

for i := 0; i < 50 && i < lengths[tc]; i++ {
ints[i] = uint64(rand.Uint32())
}

for i := 50; i < lengths[tc]; i++ {
ints[i] = uint64(rand.Uint32()) + bigInts[rand.Intn(5)]
}

sort.Slice(ints, func(i, j int) bool { return ints[i] < ints[j] })

encodedInts := Encode(ints, 256)
decodedInts := Decode(encodedInts, 0)

require.Equal(t, ints, decodedInts)
}
}
12 changes: 12 additions & 0 deletions codec/decoderversion.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// +build !noasm
// +build amd64

package codec

import (
"github.com/golang/glog"
)

func init() {
glog.Infof("[Decoder]: Using assembly version of decoder")
}
5 changes: 5 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ message UidBlock {
// because when the PB is brought to memory, Go would always use 8-bytes per integer. Instead,
// storing it as a byte slice is a lot cheaper in memory.
bytes deltas = 2;
// num_uids is the number of UIDs in the block. We are including this because we want to
// swtich encoding to groupvarint encoding. Current avaialble open source version implements
// encoding and decoding for uint32. We want to wrap it around our logic to use it here.
// Default Blocksize is 256 so uint32 would be sufficient.
uint32 num_uids = 3;
}

message UidPack {
Expand Down
Loading

0 comments on commit 6665444

Please sign in to comment.