Skip to content

Commit

Permalink
perf(raft): Use raft storage in managedmode (#6457) (#6546)
Browse files Browse the repository at this point in the history
This PR changes how we use badger in the wal store. Currently we run
badger in normal mode for `w` and `zw` store. We've seen up to 900K
entries for the same hard state (hs) key in `zw` store. These duplicate
keys cause spikes in read latencies in `zw`. The `w` store has more
compactions compared to `zw` store and so it has lesser stale data and
thus lesser spikes in read latencies.

The fix here is to open the `w` and `zw` directories in managed mode and
perform all writes on the same timestamp (max version in the db). This
leads to close to 0 duplicates in the store.

This PR also fixes the raft leader election issue which is a result of high
read latencies.

(cherry picked from commit 6882e37)
  • Loading branch information
Ibrahim Jarif authored Sep 28, 2020
1 parent 669a851 commit 9732c30
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 27 deletions.
2 changes: 1 addition & 1 deletion conn/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func TestProposal(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
store := raftwal.Init(db, 0, 0)
defer store.Closer.SignalAndWait()
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/zero/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func run() {
}
glog.Infof("Opening zero BadgerDB with options: %+v\n", kvOpt)

kv, err := badger.Open(kvOpt)
kv, err := badger.OpenManaged(kvOpt)
x.Checkf(err, "Error while opening WAL store")
defer kv.Close()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ require (
github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4
github.com/dgraph-io/badger/v2 v2.2007.3-0.20200921170002-6a6b506c7386
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6
github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 h1:DUDFTVgqZysKplH39/ya0aI4+zGm91L9QttXgITT2YE=
github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE=
github.com/dgraph-io/badger/v2 v2.2007.3-0.20200921170002-6a6b506c7386 h1:n8dsIfgnoDeAuTKxi0gr2uhBFl62ukfA7cykc3nqeyE=
github.com/dgraph-io/badger/v2 v2.2007.3-0.20200921170002-6a6b506c7386/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI=
github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E=
Expand Down
65 changes: 55 additions & 10 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package raftwal
import (
"bytes"
"encoding/binary"
"fmt"
"math"
"sync"

Expand All @@ -34,12 +35,16 @@ import (
"golang.org/x/net/trace"
)

// versionKey is hardcoded into the special key used to fetch the maximum version from the DB.
const versionKey = 1

// DiskStorage handles disk access and writing for the RAFT write-ahead log.
type DiskStorage struct {
db *badger.DB
id uint64
gid uint32
elog trace.EventLog
db *badger.DB
commitTs uint64
id uint64
gid uint32
elog trace.EventLog

cache *sync.Map
Closer *z.Closer
Expand All @@ -60,6 +65,8 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {
Closer: z.NewCloser(1),
indexRangeChan: make(chan indexRange, 16),
}

w.fetchMaxVersion()
if prev, err := RaftId(db); err != nil || prev != id {
x.Check(w.StoreRaftId(id))
}
Expand Down Expand Up @@ -88,11 +95,40 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage {
return w
}

// fetchMaxVersion fetches the commitTs to be used in the raftwal. The version is
// fetched from the special key "maxVersion-id" or from db.MaxVersion
// API which uses the stream framework.
func (w *DiskStorage) fetchMaxVersion() {
// This is a special key that is used to fetch the latest version.
key := []byte(fmt.Sprintf("maxVersion-%d", versionKey))

txn := w.db.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()

item, err := txn.Get(key)
if err == nil {
w.commitTs = item.Version()
return
}
if err == badger.ErrKeyNotFound {
// We don't have the special key so get it using the MaxVersion API.
version, err := w.db.MaxVersion()
x.Check(err)

w.commitTs = version + 1
// Insert the same key back into badger for reuse.
x.Check(txn.Set(key, nil))
x.Check(txn.CommitAt(w.commitTs, nil))
} else {
x.Check(err)
}
}

func (w *DiskStorage) processIndexRange() {
defer w.Closer.Done()

processSingleRange := func(r indexRange) {
batch := w.db.NewWriteBatch()
batch := w.db.NewWriteBatchAt(w.commitTs)
if err := w.deleteRange(batch, r.from, r.until); err != nil {
glog.Errorf("deleteRange failed with error: %v, from: %d, until: %d\n",
err, r.from, r.until)
Expand Down Expand Up @@ -189,9 +225,18 @@ func (w *DiskStorage) entryPrefix() []byte {
return b
}

func (w *DiskStorage) update(cb func(txn *badger.Txn) error) error {
txn := w.db.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()
if err := cb(txn); err != nil {
return err
}
return txn.CommitAt(w.commitTs, nil)
}

// StoreRaftId stores the given RAFT ID in disk.
func (w *DiskStorage) StoreRaftId(id uint64) error {
return w.db.Update(func(txn *badger.Txn) error {
return w.update(func(txn *badger.Txn) error {
var b [8]byte
binary.BigEndian.PutUint64(b[:], id)
return txn.Set(idKey, b[:])
Expand All @@ -200,7 +245,7 @@ func (w *DiskStorage) StoreRaftId(id uint64) error {

// UpdateCheckpoint writes the given snapshot to disk.
func (w *DiskStorage) UpdateCheckpoint(snap *pb.Snapshot) error {
return w.db.Update(func(txn *badger.Txn) error {
return w.update(func(txn *badger.Txn) error {
data, err := snap.Marshal()
if err != nil {
return err
Expand Down Expand Up @@ -450,7 +495,7 @@ func (w *DiskStorage) reset(es []raftpb.Entry) error {
w.cache = new(sync.Map) // reset cache.

// Clean out the state.
batch := w.db.NewWriteBatch()
batch := w.db.NewWriteBatchAt(w.commitTs)
defer batch.Cancel()

if err := w.deleteFrom(batch, 0); err != nil {
Expand Down Expand Up @@ -676,7 +721,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte
snap.Metadata.ConfState = *cs
snap.Data = data

batch := w.db.NewWriteBatch()
batch := w.db.NewWriteBatchAt(w.commitTs)
defer batch.Cancel()
if err := w.setSnapshot(batch, snap); err != nil {
return err
Expand All @@ -698,7 +743,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte
// writes then all of them can be written together. Note that when writing an Entry with Index i,
// any previously-persisted entries with Index >= i must be discarded.
func (w *DiskStorage) Save(h raftpb.HardState, es []raftpb.Entry, snap raftpb.Snapshot) error {
batch := w.db.NewWriteBatch()
batch := w.db.NewWriteBatchAt(w.commitTs)
defer batch.Cancel()

if err := w.addEntries(batch, es); err != nil {
Expand Down
20 changes: 10 additions & 10 deletions raftwal/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestStorageTerm(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand Down Expand Up @@ -101,7 +101,7 @@ func TestStorageEntries(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestStorageLastIndex(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestStorageFirstIndex(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand All @@ -194,7 +194,7 @@ func TestStorageFirstIndex(t *testing.T) {
t.Errorf("first = %d, want %d", first, 4)
}

batch := db.NewWriteBatch()
batch := db.NewWriteBatchAt(ds.commitTs)
require.NoError(t, ds.deleteRange(batch, 0, 4))
require.NoError(t, batch.Flush())
ds.cache.Store(firstKey, 0)
Expand All @@ -212,7 +212,7 @@ func TestStorageCompact(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand All @@ -237,7 +237,7 @@ func TestStorageCompact(t *testing.T) {
for i, tt := range tests {
first, err := ds.FirstIndex()
require.NoError(t, err)
batch := db.NewWriteBatch()
batch := db.NewWriteBatchAt(ds.commitTs)
err = ds.deleteRange(batch, first-1, tt.i)
require.NoError(t, batch.Flush())
if err != tt.werr {
Expand All @@ -264,7 +264,7 @@ func TestStorageCreateSnapshot(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand Down Expand Up @@ -302,7 +302,7 @@ func TestStorageAppend(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := badger.Open(badger.DefaultOptions(dir))
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestStorageAppend(t *testing.T) {

for i, tt := range tests {
require.NoError(t, ds.reset(ents))
batch := db.NewWriteBatch()
batch := db.NewWriteBatchAt(ds.commitTs)
err := ds.addEntries(batch, tt.entries)
if err != tt.werr {
t.Errorf("#%d: err = %v, want %v", i, err, tt.werr)
Expand Down
2 changes: 1 addition & 1 deletion worker/draft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestCalculateSnapshot(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(dir)

db, err := openBadger(dir)
db, err := badger.OpenManaged(badger.DefaultOptions(dir))
require.NoError(t, err)
ds := raftwal.Init(db, 0, 0)
defer ds.Closer.SignalAndWait()
Expand Down
2 changes: 1 addition & 1 deletion worker/server_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (s *ServerState) initStorage() {
glog.Infof("Opening write-ahead log BadgerDB with options: %+v\n", opt)
opt.EncryptionKey = key

s.WALstore, err = badger.Open(opt)
s.WALstore, err = badger.OpenManaged(opt)
x.Checkf(err, "Error while creating badger KV WAL store")
}
{
Expand Down

0 comments on commit 9732c30

Please sign in to comment.