Skip to content

Commit

Permalink
Free rocksdb slices in order to avoid memory leaks
Browse files Browse the repository at this point in the history
  • Loading branch information
aalda committed Mar 19, 2019
1 parent b9cd8d2 commit 9292f74
Showing 1 changed file with 26 additions and 9 deletions.
35 changes: 26 additions & 9 deletions storage/rocks/rocksdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ type RocksDBStore struct {
// each checkpoint is created in a subdirectory
// inside checkPointPath folder
checkpoints map[uint64]string

ro *rocksdb.ReadOptions
wo *rocksdb.WriteOptions
}

type rocksdbOpts struct {
Expand Down Expand Up @@ -74,6 +77,8 @@ func NewRocksDBStoreOpts(opts *rocksdbOpts) (*RocksDBStore, error) {
db: db,
checkPointPath: checkPointPath,
checkpoints: make(map[uint64]string),
wo: rocksdb.NewDefaultWriteOptions(),
ro: rocksdb.NewDefaultReadOptions(),
}
return store, nil
}
Expand All @@ -85,15 +90,15 @@ func (s RocksDBStore) Mutate(mutations []*storage.Mutation) error {
key := append([]byte{m.Prefix}, m.Key...)
batch.Put(key, m.Value)
}
err := s.db.Write(rocksdb.NewDefaultWriteOptions(), batch)
err := s.db.Write(s.wo, batch)
return err
}

func (s RocksDBStore) Get(prefix byte, key []byte) (*storage.KVPair, error) {
result := new(storage.KVPair)
result.Key = key
k := append([]byte{prefix}, key...)
v, err := s.db.GetBytes(rocksdb.NewDefaultReadOptions(), k)
v, err := s.db.GetBytes(s.ro, k)
if err != nil {
return nil, err
}
Expand All @@ -108,37 +113,41 @@ func (s RocksDBStore) GetRange(prefix byte, start, end []byte) (storage.KVRange,
result := make(storage.KVRange, 0)
startKey := append([]byte{prefix}, start...)
endKey := append([]byte{prefix}, end...)
it := s.db.NewIterator(rocksdb.NewDefaultReadOptions())
it := s.db.NewIterator(s.ro)
defer it.Close()
for it.Seek(startKey); it.Valid(); it.Next() {
keySlice := it.Key()
key := make([]byte, keySlice.Size())
copy(key, keySlice.Data())
keySlice.Free()
if bytes.Compare(key, endKey) > 0 {
break
}
valueSlice := it.Value()
value := make([]byte, valueSlice.Size())
copy(value, valueSlice.Data())
result = append(result, storage.KVPair{key[1:], value})
result = append(result, storage.KVPair{Key: key[1:], Value: value})
valueSlice.Free()
}

return result, nil
}

func (s RocksDBStore) GetLast(prefix byte) (*storage.KVPair, error) {
it := s.db.NewIterator(rocksdb.NewDefaultReadOptions())
it := s.db.NewIterator(s.ro)
defer it.Close()
it.SeekForPrev([]byte{prefix, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff})
if it.ValidForPrefix([]byte{prefix}) {
result := new(storage.KVPair)
keySlice := it.Key()
key := make([]byte, keySlice.Size())
copy(key, keySlice.Data())
keySlice.Free()
result.Key = key[1:]
valueSlice := it.Value()
value := make([]byte, valueSlice.Size())
copy(value, valueSlice.Data())
valueSlice.Free()
result.Value = value
return result, nil
}
Expand Down Expand Up @@ -166,6 +175,8 @@ func (r *RocksDBKVPairReader) Read(buffer []*storage.KVPair) (n int, err error)
value := make([]byte, valueSlice.Size())
copy(key, keySlice.Data())
copy(value, valueSlice.Data())
keySlice.Free()
valueSlice.Free()
buffer[n] = &storage.KVPair{Key: key[1:], Value: value}
n++
}
Expand All @@ -182,6 +193,8 @@ func (s RocksDBStore) GetAll(prefix byte) storage.KVPairReader {

func (s RocksDBStore) Close() error {
s.db.Close()
s.ro.Destroy()
s.wo.Destroy()
return nil
}

Expand Down Expand Up @@ -231,10 +244,14 @@ func (s *RocksDBStore) Backup(w io.Writer, id uint64) error {
defer it.Close()

for it.SeekToFirst(); it.Valid(); it.Next() {
keySlice := it.Key().Data()
valueSlice := it.Value().Data()
key := append(keySlice[:0:0], keySlice...) // See https://github.com/go101/go101/wiki
value := append(valueSlice[:0:0], valueSlice...)
keySlice := it.Key()
valueSlice := it.Value()
keyData := keySlice.Data()
valueData := valueSlice.Data()
key := append(keyData[:0:0], keyData...) // See https://github.com/go101/go101/wiki
value := append(valueData[:0:0], valueData...)
keySlice.Free()
valueSlice.Free()

entry := &pb.KVPair{
Key: key,
Expand Down

0 comments on commit 9292f74

Please sign in to comment.