From d8053fbb93b7e7577ca64ccf02097abdbdbe08bd Mon Sep 17 00:00:00 2001 From: Ibrahim Jarif Date: Mon, 21 Sep 2020 23:33:47 +0530 Subject: [PATCH] perf(raft): Use raft storage in managedmode (#6457) This PR changes how we use badger in the wal store. Currently we run badger in normal mode for `w` and `zw` store. We've seen up to 900K entries for the same hard state (hs) key in `zw` store. These duplicate keys cause spikes in read latencies in `zw`. The `w` store has more compactions compared to `zw` store and so it has lesser stale data and thus lesser spikes in read latencies. The fix here is to open the `w` and `zw` directories in managed mode and perform all writes on the same timestamp (max version in the db). This leads to close to 0 duplicates in the store. This PR also fixes the raft leader election issue which is a result of high read latencies. (cherry picked from commit 6882e377d24f8c89c9a426b1011c24334d2e7c3a) (cherry picked from commit ea343c99a5930227587422213c6a8b7e80f57080) --- conn/node_test.go | 2 +- dgraph/cmd/zero/run.go | 2 +- go.mod | 2 +- go.sum | 4 +-- raftwal/storage.go | 65 ++++++++++++++++++++++++++++++++++------- raftwal/storage_test.go | 20 ++++++------- worker/draft_test.go | 2 +- worker/server_state.go | 2 +- 8 files changed, 72 insertions(+), 27 deletions(-) diff --git a/conn/node_test.go b/conn/node_test.go index d2f1328adc6..f0fe0718dc9 100644 --- a/conn/node_test.go +++ b/conn/node_test.go @@ -65,7 +65,7 @@ func TestProposal(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) store := raftwal.Init(db, 0, 0) defer store.Closer.SignalAndWait() diff --git a/dgraph/cmd/zero/run.go b/dgraph/cmd/zero/run.go index bbf0841bd6c..b968e7226a1 100644 --- a/dgraph/cmd/zero/run.go +++ b/dgraph/cmd/zero/run.go @@ -279,7 +279,7 @@ func run() { } glog.Infof("Opening zero BadgerDB with options: %+v\n", kvOpt) - kv, err := badger.Open(kvOpt) + kv, err := badger.OpenManaged(kvOpt) x.Checkf(err, "Error while opening WAL store") defer kv.Close() diff --git a/go.mod b/go.mod index 52127c3d3d3..70114f93d13 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/blevesearch/segment v0.0.0-20160915185041-762005e7a34f // indirect github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd - github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 + github.com/dgraph-io/badger/v2 v2.2007.3-0.20200921170002-6a6b506c7386 github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 github.com/dgraph-io/ristretto v0.0.4-0.20200904131139-4dec2770af66 github.com/dgrijalva/jwt-go v3.2.0+incompatible diff --git a/go.sum b/go.sum index 5366ae15d82..3e5ccc48df8 100644 --- a/go.sum +++ b/go.sum @@ -62,8 +62,8 @@ github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkE github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4 h1:DUDFTVgqZysKplH39/ya0aI4+zGm91L9QttXgITT2YE= -github.com/dgraph-io/badger/v2 v2.2007.2-0.20200827131741-d5a25b83fbf4/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= +github.com/dgraph-io/badger/v2 v2.2007.3-0.20200921170002-6a6b506c7386 h1:n8dsIfgnoDeAuTKxi0gr2uhBFl62ukfA7cykc3nqeyE= +github.com/dgraph-io/badger/v2 v2.2007.3-0.20200921170002-6a6b506c7386/go.mod h1:26P/7fbL4kUZVEVKLAKXkBXKOydDmM2p1e+NhhnBCAE= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI= github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= diff --git a/raftwal/storage.go b/raftwal/storage.go index d362cd3ec35..9f1a02a78d6 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -19,6 +19,7 @@ package raftwal import ( "bytes" "encoding/binary" + "fmt" "math" "sync" @@ -34,12 +35,16 @@ import ( "golang.org/x/net/trace" ) +// versionKey is hardcoded into the special key used to fetch the maximum version from the DB. +const versionKey = 1 + // DiskStorage handles disk access and writing for the RAFT write-ahead log. type DiskStorage struct { - db *badger.DB - id uint64 - gid uint32 - elog trace.EventLog + db *badger.DB + commitTs uint64 + id uint64 + gid uint32 + elog trace.EventLog cache *sync.Map Closer *z.Closer @@ -60,6 +65,8 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { Closer: z.NewCloser(1), indexRangeChan: make(chan indexRange, 16), } + + w.fetchMaxVersion() if prev, err := RaftId(db); err != nil || prev != id { x.Check(w.StoreRaftId(id)) } @@ -88,11 +95,40 @@ func Init(db *badger.DB, id uint64, gid uint32) *DiskStorage { return w } +// fetchMaxVersion fetches the commitTs to be used in the raftwal. The version is +// fetched from the special key "maxVersion-id" or from db.MaxVersion +// API which uses the stream framework. +func (w *DiskStorage) fetchMaxVersion() { + // This is a special key that is used to fetch the latest version. + key := []byte(fmt.Sprintf("maxVersion-%d", versionKey)) + + txn := w.db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + + item, err := txn.Get(key) + if err == nil { + w.commitTs = item.Version() + return + } + if err == badger.ErrKeyNotFound { + // We don't have the special key so get it using the MaxVersion API. + version, err := w.db.MaxVersion() + x.Check(err) + + w.commitTs = version + 1 + // Insert the same key back into badger for reuse. + x.Check(txn.Set(key, nil)) + x.Check(txn.CommitAt(w.commitTs, nil)) + } else { + x.Check(err) + } +} + func (w *DiskStorage) processIndexRange() { defer w.Closer.Done() processSingleRange := func(r indexRange) { - batch := w.db.NewWriteBatch() + batch := w.db.NewWriteBatchAt(w.commitTs) if err := w.deleteRange(batch, r.from, r.until); err != nil { glog.Errorf("deleteRange failed with error: %v, from: %d, until: %d\n", err, r.from, r.until) @@ -189,9 +225,18 @@ func (w *DiskStorage) entryPrefix() []byte { return b } +func (w *DiskStorage) update(cb func(txn *badger.Txn) error) error { + txn := w.db.NewTransactionAt(math.MaxUint64, true) + defer txn.Discard() + if err := cb(txn); err != nil { + return err + } + return txn.CommitAt(w.commitTs, nil) +} + // StoreRaftId stores the given RAFT ID in disk. func (w *DiskStorage) StoreRaftId(id uint64) error { - return w.db.Update(func(txn *badger.Txn) error { + return w.update(func(txn *badger.Txn) error { var b [8]byte binary.BigEndian.PutUint64(b[:], id) return txn.Set(idKey, b[:]) @@ -200,7 +245,7 @@ func (w *DiskStorage) StoreRaftId(id uint64) error { // UpdateCheckpoint writes the given snapshot to disk. func (w *DiskStorage) UpdateCheckpoint(snap *pb.Snapshot) error { - return w.db.Update(func(txn *badger.Txn) error { + return w.update(func(txn *badger.Txn) error { data, err := snap.Marshal() if err != nil { return err @@ -450,7 +495,7 @@ func (w *DiskStorage) reset(es []raftpb.Entry) error { w.cache = new(sync.Map) // reset cache. // Clean out the state. - batch := w.db.NewWriteBatch() + batch := w.db.NewWriteBatchAt(w.commitTs) defer batch.Cancel() if err := w.deleteFrom(batch, 0); err != nil { @@ -676,7 +721,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte snap.Metadata.ConfState = *cs snap.Data = data - batch := w.db.NewWriteBatch() + batch := w.db.NewWriteBatchAt(w.commitTs) defer batch.Cancel() if err := w.setSnapshot(batch, snap); err != nil { return err @@ -698,7 +743,7 @@ func (w *DiskStorage) CreateSnapshot(i uint64, cs *raftpb.ConfState, data []byte // writes then all of them can be written together. Note that when writing an Entry with Index i, // any previously-persisted entries with Index >= i must be discarded. func (w *DiskStorage) Save(h raftpb.HardState, es []raftpb.Entry, snap raftpb.Snapshot) error { - batch := w.db.NewWriteBatch() + batch := w.db.NewWriteBatchAt(w.commitTs) defer batch.Cancel() if err := w.addEntries(batch, es); err != nil { diff --git a/raftwal/storage_test.go b/raftwal/storage_test.go index 4b5ab23ee83..d923c9fcc21 100644 --- a/raftwal/storage_test.go +++ b/raftwal/storage_test.go @@ -50,7 +50,7 @@ func TestStorageTerm(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := Init(db, 0, 0) defer ds.Closer.SignalAndWait() @@ -101,7 +101,7 @@ func TestStorageEntries(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := Init(db, 0, 0) defer ds.Closer.SignalAndWait() @@ -147,7 +147,7 @@ func TestStorageLastIndex(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := Init(db, 0, 0) defer ds.Closer.SignalAndWait() @@ -178,7 +178,7 @@ func TestStorageFirstIndex(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := Init(db, 0, 0) defer ds.Closer.SignalAndWait() @@ -194,7 +194,7 @@ func TestStorageFirstIndex(t *testing.T) { t.Errorf("first = %d, want %d", first, 4) } - batch := db.NewWriteBatch() + batch := db.NewWriteBatchAt(ds.commitTs) require.NoError(t, ds.deleteRange(batch, 0, 4)) require.NoError(t, batch.Flush()) ds.cache.Store(firstKey, 0) @@ -212,7 +212,7 @@ func TestStorageCompact(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := Init(db, 0, 0) defer ds.Closer.SignalAndWait() @@ -237,7 +237,7 @@ func TestStorageCompact(t *testing.T) { for i, tt := range tests { first, err := ds.FirstIndex() require.NoError(t, err) - batch := db.NewWriteBatch() + batch := db.NewWriteBatchAt(ds.commitTs) err = ds.deleteRange(batch, first-1, tt.i) require.NoError(t, batch.Flush()) if err != tt.werr { @@ -264,7 +264,7 @@ func TestStorageCreateSnapshot(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := Init(db, 0, 0) defer ds.Closer.SignalAndWait() @@ -302,7 +302,7 @@ func TestStorageAppend(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := badger.Open(badger.DefaultOptions(dir)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := Init(db, 0, 0) defer ds.Closer.SignalAndWait() @@ -351,7 +351,7 @@ func TestStorageAppend(t *testing.T) { for i, tt := range tests { require.NoError(t, ds.reset(ents)) - batch := db.NewWriteBatch() + batch := db.NewWriteBatchAt(ds.commitTs) err := ds.addEntries(batch, tt.entries) if err != tt.werr { t.Errorf("#%d: err = %v, want %v", i, err, tt.werr) diff --git a/worker/draft_test.go b/worker/draft_test.go index 744739e12a1..a3c32642520 100644 --- a/worker/draft_test.go +++ b/worker/draft_test.go @@ -56,7 +56,7 @@ func TestCalculateSnapshot(t *testing.T) { require.NoError(t, err) defer os.RemoveAll(dir) - db, err := openBadger(dir) + db, err := badger.OpenManaged(badger.DefaultOptions(dir)) require.NoError(t, err) ds := raftwal.Init(db, 0, 0) defer ds.Closer.SignalAndWait() diff --git a/worker/server_state.go b/worker/server_state.go index 9694ff916ed..7963493422d 100644 --- a/worker/server_state.go +++ b/worker/server_state.go @@ -155,7 +155,7 @@ func (s *ServerState) initStorage() { glog.Infof("Opening write-ahead log BadgerDB with options: %+v\n", opt) opt.EncryptionKey = key - s.WALstore, err = badger.Open(opt) + s.WALstore, err = badger.OpenManaged(opt) x.Checkf(err, "Error while creating badger KV WAL store") } {