Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take snapshots less frequently #3367

Merged
merged 9 commits into from
May 3, 2019
15 changes: 15 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ they form a Raft group and provide synchronous replication.
"[mmap, disk] Specifies how Badger Value log is stored."+
" mmap consumes more RAM, but provides better performance.")

// Snapshot and Transactions.
flag.Int("snapshot_after", 10000,
"Create a new Raft snapshot after this many number of Raft entries. The"+
" lower this number, the more frequent snapshot creation would be."+
" Also determines how often Rollups would happen.")
flag.String("abort_older_than", "5m",
"Abort any pending transactions older than this duration. The liveness of transaction"+
" is determined based on the last mutation ran by the transaction.")

// OpenCensus flags.
flag.Float64("trace", 1.0, "The ratio of queries to trace.")
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")
Expand Down Expand Up @@ -462,6 +471,10 @@ func run() {

ips, err := getIPsFromString(Alpha.Conf.GetString("whitelist"))
x.Check(err)

abortDur, err := time.ParseDuration(Alpha.Conf.GetString("abort_older_than"))
x.Check(err)

x.WorkerConfig = x.WorkerOptions{
ExportPath: Alpha.Conf.GetString("export"),
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Expand All @@ -474,6 +487,8 @@ func run() {
MaxRetries: Alpha.Conf.GetInt("max_retries"),
StrictMutations: opts.MutationsMode == edgraph.StrictMutations,
AclEnabled: secretFile != "",
SnapshotAfter: Alpha.Conf.GetInt("snapshot_after"),
AbortOlderThan: abortDur,
}

setupCustomTokenizers()
Expand Down
3 changes: 3 additions & 0 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ func printKeys(db *badger.DB) {

// Don't use a switch case here. Because multiple of these can be true. In particular,
// IsSchema can be true alongside IsData.
if pk.IsRaft() {
buf.WriteString("{r}")
}
if pk.IsData() {
buf.WriteString("{d}")
}
Expand Down
110 changes: 86 additions & 24 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ type node struct {
gid uint32
closer *y.Closer

lastCommitTs uint64 // Only used to ensure that our commit Ts is monotonically increasing.

streaming int32 // Used to avoid calculating snapshot

canCampaign bool
Expand Down Expand Up @@ -540,12 +538,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
}

for _, status := range delta.Txns {
if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs {
glog.Errorf("Lastcommit %d > current %d. This would cause some commits to be lost.",
n.lastCommitTs, status.CommitTs)
}
toDisk(status.StartTs, status.CommitTs)
n.lastCommitTs = status.CommitTs
}
if err := writer.Flush(); err != nil {
return x.Errorf("Error while flushing to disk: %v", err)
Expand Down Expand Up @@ -675,6 +668,66 @@ func (n *node) rampMeter() {
time.Sleep(3 * time.Millisecond)
}
}

func (n *node) findRaftProgress() (uint64, error) {
var applied uint64
err := pstore.View(func(txn *badger.Txn) error {
item, err := txn.Get(x.RaftKey())
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var snap pb.Snapshot
if err := snap.Unmarshal(val); err != nil {
return err
}
applied = snap.Index
return nil
})
})
return applied, err
}

func (n *node) updateRaftProgress() error {
// Both leader and followers can independently update their Raft progress. We don't store
// this in Raft WAL. Instead, this is used to just skip over log records that this Alpha
// has already applied, to speed up things on a restart.
snap, err := n.calculateSnapshot(10)
if err != nil {
return err
}
if snap == nil {
return nil
}

// Let's check what we already have. And only update if the new snap.Index is ahead of the last
// stored applied.
applied, err := n.findRaftProgress()
if err != nil {
return err
}
if snap.Index <= applied {
return nil
}

data, err := snap.Marshal()
x.Check(err)
txn := pstore.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()

if err := txn.Set(x.RaftKey(), data); err != nil {
return err
}
if err := txn.CommitAt(1, nil); err != nil {
return err
}
glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
return nil
}

func (n *node) Run() {
defer n.closer.Done() // CLOSER:1

Expand All @@ -699,7 +752,13 @@ func (n *node) Run() {
close(done)
}()

var snapshotLoops uint64
applied, err := n.findRaftProgress()
if err != nil {
glog.Errorf("While trying to find raft progress: %v", err)
} else {
glog.Infof("Found Raft progress in p directory: %d", applied)
}

for {
select {
case <-done:
Expand All @@ -712,23 +771,23 @@ func (n *node) Run() {

case <-slowTicker.C:
n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
if err := n.updateRaftProgress(); err != nil {
glog.Errorf("While updating Raft progress: %v", err)
}

if leader {
// We try to take a snapshot every slow tick duration, with a 1000 discard entries.
// But, once a while, we take a snapshot with 10 discard entries. This avoids the
// scenario where after bringing up an Alpha, and doing a hundred schema updates, we
// don't take any snapshots because there are not enough updates (discardN=10),
// which then really slows down restarts. At the same time, by checking more
// frequently, we can quickly take a snapshot if a lot of mutations are coming in
// fast (discardN=1000).
discardN := 1000
if snapshotLoops%5 == 0 {
discardN = 10
}
snapshotLoops++
// We keep track of the applied index in the p directory. Even if we don't take
// snapshot for a while and let the Raft logs grow and restart, we would not have to
// run all the log entries, because we can tell Raft.Config to set Applied to that
// index.
// This applied index tracking also covers the case when we have a big index
// rebuild. The rebuild would be tracked just like others and would not need to be
// replayed after a restart, because the Applied config would let us skip right
// through it.
// We use disk based storage for Raft. So, we're not too concerned about
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if err := n.proposeSnapshot(discardN); err != nil {
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil {
x.Errorf("While calculating and proposing snapshot: %v", err)
}
go n.abortOldTransactions()
Expand Down Expand Up @@ -845,6 +904,10 @@ func (n *node) Run() {
n.elog.Printf("Found empty data at index: %d", entry.Index)
n.Applied.Done(entry.Index)

} else if entry.Index < applied {
n.elog.Printf("Skipping over already applied entry: %d", entry.Index)
n.Applied.Done(entry.Index)

} else {
proposal := &pb.Proposal{}
if err := proposal.Unmarshal(entry.Data); err != nil {
Expand Down Expand Up @@ -1044,7 +1107,6 @@ func (n *node) blockingAbort(req *pb.TxnTimestamps) error {

// Let's propose the txn updates received from Zero. This is important because there are edge
// cases where a txn status might have been missed by the group.
glog.Infof("TryAbort returned with delta: %+v\n", delta)
aborted := &pb.OracleDelta{}
for _, txn := range delta.Txns {
// Only pick the aborts. DO NOT propose the commits. They must come in the right order via
Expand Down Expand Up @@ -1073,14 +1135,14 @@ func (n *node) blockingAbort(req *pb.TxnTimestamps) error {
// abort. Note that only the leader runs this function.
func (n *node) abortOldTransactions() {
// Aborts if not already committed.
starts := posting.Oracle().TxnOlderThan(5 * time.Minute)
starts := posting.Oracle().TxnOlderThan(x.WorkerConfig.AbortOlderThan)
if len(starts) == 0 {
return
}
glog.Infof("Found %d old transactions. Acting to abort them.\n", len(starts))
req := &pb.TxnTimestamps{Ts: starts}
err := n.blockingAbort(req)
glog.Infof("abortOldTransactions for %d txns. Error: %+v\n", len(req.Ts), err)
glog.Infof("Done abortOldTransactions for %d txns. Error: %+v\n", len(req.Ts), err)
}

// calculateSnapshot would calculate a snapshot index, considering these factors:
Expand Down
7 changes: 6 additions & 1 deletion x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package x

import "net"
import (
"net"
"time"
)

type Options struct {
DebugMode bool
Expand All @@ -43,6 +46,8 @@ type WorkerOptions struct {
MaxRetries int
StrictMutations bool
AclEnabled bool
AbortOlderThan time.Duration
SnapshotAfter int
}

var WorkerConfig WorkerOptions
16 changes: 16 additions & 0 deletions x/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
DefaultPrefix = byte(0x00)
byteSchema = byte(0x01)
byteType = byte(0x02)
byteRaft = byte(0xff)
)

func writeAttr(buf []byte, attr string) []byte {
Expand All @@ -48,6 +49,13 @@ func writeAttr(buf []byte, attr string) []byte {
return rest[len(attr):]
}

func RaftKey() []byte {
buf := make([]byte, 5)
buf[0] = byteRaft
AssertTrue(4 == copy(buf[1:5], []byte("raft")))
return buf
}

// SchemaKey returns schema key for given attribute. Schema keys are stored
// separately with unique prefix, since we need to iterate over all schema keys.
func SchemaKey(attr string) []byte {
Expand Down Expand Up @@ -136,6 +144,10 @@ type ParsedKey struct {
bytePrefix byte
}

func (p ParsedKey) IsRaft() bool {
return p.bytePrefix == byteRaft
}

func (p ParsedKey) IsData() bool {
return p.bytePrefix == DefaultPrefix && p.byteType == ByteData
}
Expand Down Expand Up @@ -295,6 +307,10 @@ func Parse(key []byte) *ParsedKey {
p := &ParsedKey{}

p.bytePrefix = key[0]
if p.bytePrefix == byteRaft {
return p
}

sz := int(binary.BigEndian.Uint16(key[1:3]))
k := key[3:]

Expand Down