Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(badger): making memory allocation to handle tags #1676

Merged
merged 3 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions badger/cmd/write_bench.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func writeSorted(db *badger.DB, num uint64) error {
writeRange := func(start, end uint64, streamId uint32) {
// end is not included.
defer wg.Done()
kvBuf := z.NewBuffer(5 << 20)
kvBuf := z.NewBuffer(5 << 20, "Benchmark.WriteSorted")
var sz int
for i := start; i < end; i++ {
key := make([]byte, 8)
Expand All @@ -229,7 +229,7 @@ func writeSorted(db *badger.DB, num uint64) error {

if sz >= 4<<20 { // 4 MB
writeCh <- kvBuf
kvBuf = z.NewBuffer(1 << 20)
kvBuf = z.NewBuffer(1 << 20, "Benchmark.WriteSorted")
sz = 0
}
}
Expand Down
2 changes: 1 addition & 1 deletion badger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func main() {
runtime.SetBlockProfileRate(100)
runtime.GOMAXPROCS(128)

out := z.CallocNoRef(1)
out := z.CallocNoRef(1, "Badger.Main")
fmt.Printf("jemalloc enabled: %v\n", len(out) > 0)
z.StatsPrint()
z.Free(out)
Expand Down
2 changes: 1 addition & 1 deletion db2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ func TestKeyCount(t *testing.T) {
}()

write := func(kvs *pb.KVList) error {
buf := z.NewBuffer(1 << 20)
buf := z.NewBuffer(1 << 20, "test")
defer buf.Release()

for _, kv := range kvs.Kv {
Expand Down
2 changes: 1 addition & 1 deletion db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ func TestVerifyChecksum(t *testing.T) {
y.Check2(rand.Read(value))
st := 0

buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
for i := 0; i < 1000; i++ {
key := make([]byte, 8)
Expand Down
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@ go 1.12
require (
github.com/DataDog/zstd v1.4.1
github.com/cespare/xxhash v1.1.0
github.com/dgraph-io/ristretto v0.0.4-0.20210205182321-f8e4908e34d1
github.com/dgraph-io/ristretto v0.0.4-0.20210303184538-12d7dcf7c838
github.com/dustin/go-humanize v1.0.0
github.com/go-delve/delve v1.5.0 // indirect
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.3.1
github.com/golang/snappy v0.0.1
github.com/google/flatbuffers v1.12.0
github.com/google/go-cmp v0.5.4 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/mmcloughlin/avo v0.0.0-20201105074841-5d2f697d268f // indirect
github.com/pkg/errors v0.9.1
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cobra v0.0.5
github.com/stretchr/testify v1.4.0
github.com/twitchyliquid64/golang-asm v0.15.0 // indirect
go.opencensus.io v0.22.5
golang.org/x/net v0.0.0-20201021035429-f5854403a974
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/ristretto v0.0.4-0.20210205182321-f8e4908e34d1 h1:E+NH1+aTO3vgP/t01DvYbX8qnwBC0bPgPwK7y+y0vaE=
github.com/dgraph-io/ristretto v0.0.4-0.20210205182321-f8e4908e34d1/go.mod h1:tv2ec8nA7vRpSYX7/MbP52ihrUMXIHit54CQMq8npXQ=
github.com/dgraph-io/ristretto v0.0.4-0.20210303183749-186bd451575a h1:ns9JhUO4SjXPAsfCmZeX7PKYmbQCjeEWLdBmO8tCQYg=
aman-bansal marked this conversation as resolved.
Show resolved Hide resolved
github.com/dgraph-io/ristretto v0.0.4-0.20210303183749-186bd451575a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.0.4-0.20210303184538-12d7dcf7c838 h1:ShF3qLqrZEhsPtlWSIry3w35SBQA5/W1STDSp/b0nvk=
github.com/dgraph-io/ristretto v0.0.4-0.20210303184538-12d7dcf7c838/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo=
Expand Down
7 changes: 3 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
defer txn.Discard()

// produceKVs is running iterate serially. So, we can define the outList here.
outList := z.NewBuffer(2 * batchSize)
outList := z.NewBuffer(2 * batchSize, "Stream.ProduceKVs")
defer func() {
// The outList variable changes. So, we need to evaluate the variable in the defer. DO NOT
// call `defer outList.Release()`.
Expand All @@ -200,8 +200,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
itr.ThreadId = threadId
defer itr.Close()

itr.Alloc = z.NewAllocator(1 << 20)
itr.Alloc.Tag = "Stream.Iterate"
itr.Alloc = z.NewAllocator(1 << 20, "Stream.Iterate")
defer itr.Alloc.Release()

// This unique stream id is used to identify all the keys from this iteration.
Expand All @@ -211,7 +210,7 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error {
sendIt := func() error {
select {
case st.kvChan <- outList:
outList = z.NewBuffer(2 * batchSize)
outList = z.NewBuffer(2 * batchSize, "Stream.ProduceKVs")
atomic.AddUint64(&st.scanned, uint64(itr.scanned-scanned))
scanned = itr.scanned
case <-ctx.Done():
Expand Down
22 changes: 11 additions & 11 deletions stream_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
func getSortedKVList(valueSize, listSize int) *z.Buffer {
value := make([]byte, valueSize)
y.Check2(rand.Read(value))
buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
for i := 0; i < listSize; i++ {
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, uint64(i))
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestStreamWriter3(t *testing.T) {
value := make([]byte, valueSize)
y.Check2(rand.Read(value))
counter := 0
buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
for i := 0; i < noOfKeys; i++ {
key := make([]byte, 8)
Expand Down Expand Up @@ -272,7 +272,7 @@ func TestStreamWriter4(t *testing.T) {
require.NoError(t, err, "error while updating db")
}

buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: []byte("key-1"),
Expand All @@ -297,7 +297,7 @@ func TestStreamWriter5(t *testing.T) {
right[0] = 0xff
copy(right[1:], []byte("break"))

buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: left,
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestStreamWriter6(t *testing.T) {
// will be written to level 6, we need to insert at least 1 mb of data.
// Setting keycount below 32 would cause this test to fail.
keyCount := 40
buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
for i := range str {
for j := 0; j < keyCount; j++ {
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestStreamWriterCancel(t *testing.T) {
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
str := []string{"a", "a", "b", "b", "c", "c"}
ver := 1
buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
for i := range str {
kv := &pb.KV{
Expand Down Expand Up @@ -411,7 +411,7 @@ func TestStreamDone(t *testing.T) {
var val [10]byte
rand.Read(val[:])
for i := 0; i < 10; i++ {
buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
kv1 := &pb.KV{
Key: []byte(fmt.Sprintf("%d", i)),
Expand Down Expand Up @@ -452,7 +452,7 @@ func TestSendOnClosedStream(t *testing.T) {

var val [10]byte
rand.Read(val[:])
buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
kv1 := &pb.KV{
Key: []byte(fmt.Sprintf("%d", 1)),
Expand All @@ -475,7 +475,7 @@ func TestSendOnClosedStream(t *testing.T) {
require.NoError(t, db.Close())
}()
// Send once stream is closed.
buf1 := z.NewBuffer(10 << 20)
buf1 := z.NewBuffer(10 << 20, "test")
defer buf1.Release()
kv1 = &pb.KV{
Key: []byte(fmt.Sprintf("%d", 2)),
Expand All @@ -502,7 +502,7 @@ func TestSendOnClosedStream2(t *testing.T) {

var val [10]byte
rand.Read(val[:])
buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
kv1 := &pb.KV{
Key: []byte(fmt.Sprintf("%d", 1)),
Expand Down Expand Up @@ -549,7 +549,7 @@ func TestStreamWriterEncrypted(t *testing.T) {
key := []byte("mykey")
value := []byte("myvalue")

buf := z.NewBuffer(10 << 20)
buf := z.NewBuffer(10 << 20, "test")
defer buf.Release()
KVToBuffer(&pb.KV{
Key: key,
Expand Down
2 changes: 1 addition & 1 deletion table/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewTableBuilder(opts Options) *Builder {
sz = maxAllocatorInitialSz
}
b := &Builder{
alloc: opts.AllocPool.Get(sz),
alloc: opts.AllocPool.Get(sz, "TableBuilder"),
opts: &opts,
}
b.alloc.Tag = "Builder"
Expand Down
8 changes: 4 additions & 4 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ func (t *Table) decrypt(data []byte, viaCalloc bool) ([]byte, error) {

var dst []byte
if viaCalloc {
dst = z.Calloc(len(data))
dst = z.Calloc(len(data), "Table.Decrypt")
} else {
dst = make([]byte, len(data))
}
Expand Down Expand Up @@ -807,9 +807,9 @@ func (t *Table) decompress(b *block) error {
return nil
case options.Snappy:
if sz, err := snappy.DecodedLen(b.data); err == nil {
dst = z.Calloc(sz)
dst = z.Calloc(sz, "Table.Decompress")
} else {
dst = z.Calloc(len(b.data) * 4) // Take a guess.
dst = z.Calloc(len(b.data) * 4, "Table.Decompress") // Take a guess.
}
b.data, err = snappy.Decode(dst, b.data)
if err != nil {
Expand All @@ -818,7 +818,7 @@ func (t *Table) decompress(b *block) error {
}
case options.ZSTD:
sz := int(float64(t.opt.BlockSize) * 1.2)
dst = z.Calloc(sz)
dst = z.Calloc(sz, "Table.Decompress")
b.data, err = y.ZSTDDecompress(dst, b.data)
if err != nil {
z.Free(dst)
Expand Down
2 changes: 1 addition & 1 deletion y/y_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func TestEncodedSize(t *testing.T) {
}

func TestAllocatorReuse(t *testing.T) {
a := z.NewAllocator(1024)
a := z.NewAllocator(1024, "test")
defer a.Release()

N := 1024
Expand Down