Skip to content

Commit

Permalink
feat: add calculateTableSize and its unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Dec 28, 2021
1 parent 69ebc9d commit 23cd56d
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 9 deletions.
1 change: 0 additions & 1 deletion client/dmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,6 @@ func (c *Client) processIncrDecrResponse(resp protocol.EncodeDecoder) (int, erro
func (c *Client) incrDecr(op protocol.OpCode, name, key string, delta int) (int, error) {
value, err := c.serializer.Marshal(delta)
if err != nil {
fmt.Println(delta, err)
return 0, err
}
req := protocol.NewDMapMessage(op)
Expand Down
2 changes: 1 addition & 1 deletion internal/kvstore/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (kv *KVStore) Compaction() (bool, error) {
err := latest.putRaw(hkey, entry)
if errors.Is(err, errNotEnoughSpace) {
// Create a new table and put the new k/v pair in it.
ntSize := kv.Stats().Inuse*2 + len(entry) + defaultTableSize
ntSize := kv.calculateTableSize(len(entry))
nt := newTable(ntSize)
kv.tables = append(kv.tables, nt)
return false, nil
Expand Down
49 changes: 42 additions & 7 deletions internal/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package kvstore

import (
"errors"
"fmt"
"log"
"regexp"

Expand All @@ -35,8 +36,9 @@ const (
// keep metadata and mmap syscall for allocating memory to store values.
// The allocated memory is not a subject of Golang's GC.
type KVStore struct {
tables []*table
config *storage.Config
minimumTableSize int
tables []*table
config *storage.Config
}

var _ storage.Engine = (*KVStore)(nil)
Expand Down Expand Up @@ -76,10 +78,16 @@ func (kv *KVStore) Fork(c *storage.Config) (storage.Engine, error) {
if err != nil {
return nil, err
}

if _, ok := size.(int); !ok {
return nil, fmt.Errorf("tableSize is %T, not int", size)
}

child := &KVStore{
config: c,
config: c,
minimumTableSize: size.(int),
}
t := newTable(size.(int))
t := newTable(child.calculateTableSize(0))
child.tables = append(child.tables, t)
return child, nil
}
Expand All @@ -105,7 +113,7 @@ func (kv *KVStore) PutRaw(hkey uint64, value []byte) error {
err := t.putRaw(hkey, value)
if err == errNotEnoughSpace {
// Create a new table and put the new k/v pair in it.
ntSize := kv.Stats().Inuse*2 + len(value) + defaultTableSize
ntSize := kv.calculateTableSize(len(value))
nt := newTable(ntSize)
kv.tables = append(kv.tables, nt)
res = storage.ErrFragmented
Expand All @@ -121,6 +129,33 @@ func (kv *KVStore) PutRaw(hkey uint64, value []byte) error {
return res
}

func (kv *KVStore) calculateTableSize(minimum int) int {
s := kv.Stats()
if s.Inuse == 0 {
// Value size is too big, and we are trying to allocate a new table to insert it into
if kv.minimumTableSize <= minimum {
return kv.minimumTableSize + minimum
}

// Fresh table
return kv.minimumTableSize
}

doubledInuse := s.Inuse * 2

// Minimum table size is kv.minimumTableSize
if doubledInuse <= kv.minimumTableSize {
return kv.minimumTableSize
}

if doubledInuse <= minimum {
return doubledInuse + minimum
}

// Expand the table.
return doubledInuse
}

// Put sets the value for the given key. It overwrites any previous value for that key
func (kv *KVStore) Put(hkey uint64, value storage.Entry) error {
if len(kv.tables) == 0 {
Expand All @@ -133,7 +168,7 @@ func (kv *KVStore) Put(hkey uint64, value storage.Entry) error {
err := t.put(hkey, value)
if err == errNotEnoughSpace {
// Create a new table and put the new k/v pair in it.
ntSize := kv.Stats().Inuse*2 + len(value.Value()) + defaultTableSize
ntSize := kv.calculateTableSize(len(value.Value()))
nt := newTable(ntSize)
kv.tables = append(kv.tables, nt)
res = storage.ErrFragmented
Expand Down Expand Up @@ -261,7 +296,7 @@ func (kv *KVStore) Delete(hkey uint64) error {
t := kv.tables[0]
if float64(t.allocated)*maxGarbageRatio <= float64(t.garbage) {
// Create a new table here.
nt := newTable(kv.Stats().Inuse * 2)
nt := newTable(kv.calculateTableSize(0))
kv.tables = append(kv.tables, nt)
return storage.ErrFragmented
}
Expand Down
126 changes: 126 additions & 0 deletions internal/kvstore/kvstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,3 +649,129 @@ func Test_Fork(t *testing.T) {
t.Fatalf("Expected Stats.NumTables: 1. Got: %d", stats.NumTables)
}
}

func Test_Put_Big_Value(t *testing.T) {
s, err := testKVStore()
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}

value := make([]byte, 1<<20)
entry := NewEntry()
entry.SetKey(bkey(1))
entry.SetValue(value)
entry.SetTTL(int64(1))
entry.SetTimestamp(time.Now().UnixNano())
hkey := xxhash.Sum64([]byte(entry.Key()))

err = s.Put(hkey, entry)
if err != storage.ErrFragmented {
t.Fatalf("Expected storage.ErrFragmented. Got %v", err)
}
_, err = s.Get(hkey)
if err != nil {
t.Fatalf("Expected nil. Got: %v", err)
}
}

func Test_calculateTableSize(t *testing.T) {
s, err := testKVStore()
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}

kv := s.(*KVStore)
t.Run("Big value", func(t *testing.T) {
valueSize := 1 << 21
tableSize := kv.calculateTableSize(valueSize)
if kv.minimumTableSize >= tableSize {
t.Fatalf("Calculated tableSize(%d) has to be bigger than kv.minimumTableSize(%d)", tableSize, kv.minimumTableSize)
}
})

t.Run("Minimum required space is zero", func(t *testing.T) {
tableSize := kv.calculateTableSize(0)
if tableSize != kv.minimumTableSize {
t.Fatalf("Calculated tableSize(%d) has to be equal to kv.minimumTableSize(%d)", tableSize, kv.minimumTableSize)
}
})
}

func Test_MinimumTableSize(t *testing.T) {
s, err := testKVStore()
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}
kv := s.(*KVStore)
for i := 0; i < 100; i++ {
entry := NewEntry()
entry.SetKey(bkey(i))
entry.SetValue(bval(i))
entry.SetTTL(int64(i))
entry.SetTimestamp(time.Now().UnixNano())
hkey := xxhash.Sum64([]byte(entry.Key()))
err := s.Put(hkey, entry)
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}
}

tableSize := kv.calculateTableSize(0)
if tableSize != kv.minimumTableSize {
t.Fatalf("Calculated tableSize(%d) has to be equal to kv.minimumTableSize(%d)", tableSize, kv.minimumTableSize)
}
}

func Test_ExpandTable(t *testing.T) {
s, err := testKVStore()
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}
kv := s.(*KVStore)
for i := 0; i < 10000; i++ {
entry := NewEntry()
entry.SetKey(bkey(i))
entry.SetValue(bval(i))
entry.SetTTL(int64(i))
entry.SetTimestamp(time.Now().UnixNano())
hkey := xxhash.Sum64([]byte(entry.Key()))
err := s.Put(hkey, entry)
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}
}

st := kv.Stats()
tableSize := kv.calculateTableSize(0)
if tableSize != st.Inuse*2 {
t.Fatalf("Calculated tableSize(%d) has to be equal to Inuse*2(%d)", tableSize, st.Inuse*2)
}
}

func Test_ExpandTable_With_Big_Value(t *testing.T) {
s, err := testKVStore()
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}
kv := s.(*KVStore)
for i := 0; i < 10000; i++ {
entry := NewEntry()
entry.SetKey(bkey(i))
entry.SetValue(bval(i))
entry.SetTTL(int64(i))
entry.SetTimestamp(time.Now().UnixNano())
hkey := xxhash.Sum64([]byte(entry.Key()))
err := s.Put(hkey, entry)
if err != nil {
t.Fatalf("Expected nil. Got %v", err)
}
}

st := kv.Stats()
bigValueSize := 1 << 21
tableSize := kv.calculateTableSize(bigValueSize)
calculatedTableSize := (st.Inuse * 2) + bigValueSize
if tableSize != calculatedTableSize {
t.Fatalf("Calculated tableSize(%d) has to be equal to Inuse*2 + valueSize(%d)", tableSize, calculatedTableSize)
}
}

0 comments on commit 23cd56d

Please sign in to comment.