Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow truncating raft logs via debug tool #3345

Merged
merged 4 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ type flagOptions struct {
predicate string
readOnly bool
pdir string
wdir string
itemMeta bool
jepsen string
readTs uint64
sizeHistogram bool
noKeys bool

// Options related to the WAL.
wdir string
wtruncateUntil uint64
}

func init() {
Expand All @@ -82,9 +85,12 @@ func init() {
flag.StringVarP(&opt.keyLookup, "lookup", "l", "", "Hex of key to lookup.")
flag.BoolVarP(&opt.keyHistory, "history", "y", false, "Show all versions of a key.")
flag.StringVarP(&opt.pdir, "postings", "p", "", "Directory where posting lists are stored.")
flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.BoolVar(&opt.sizeHistogram, "histogram", false,
"Show a histogram of the key and value sizes.")

flag.StringVarP(&opt.wdir, "wal", "w", "", "Directory where Raft write-ahead logs are stored.")
flag.Uint64VarP(&opt.wtruncateUntil, "truncate", "t", 0,
"Remove data from Raft entries until but not including this index.")
}

func toInt(o *pb.Posting) int {
Expand Down Expand Up @@ -830,6 +836,11 @@ func parseWal(db *badger.DB) error {
startIdx := snap.Metadata.Index + 1
fmt.Printf("Last Index: %d . Num Entries: %d .\n\n", lastIdx, lastIdx-startIdx)

// In case we need to truncate raft entries.
batch := db.NewWriteBatch()
defer batch.Cancel()
var numTruncates int

pending = make(map[uint64]bool)
for startIdx < lastIdx-1 {
entries, err := store.Entries(startIdx, lastIdx, 64<<20 /* 64 MB Max Size */)
Expand All @@ -838,10 +849,35 @@ func parseWal(db *badger.DB) error {
return
}
for _, ent := range entries {
printEntry(ent)
switch {
case ent.Type == raftpb.EntryNormal && ent.Index < opt.wtruncateUntil:
if len(ent.Data) == 0 {
continue
}
ent.Data = nil
numTruncates++
k := store.EntryKey(ent.Index)
data, err := ent.Marshal()
if err != nil {
log.Fatalf("Unable to marshal entry: %+v. Error: %v", ent, err)
}
if err := batch.Set(k, data, 0); err != nil {
log.Fatalf("Unable to set data: %+v", err)
}
default:
printEntry(ent)
}
startIdx = x.Max(startIdx, ent.Index)
}
}
if err := batch.Flush(); err != nil {
fmt.Printf("Got error while flushing batch: %v\n", err)
}
if numTruncates > 0 {
fmt.Printf("==> Log entries truncated: %d\n\n", numTruncates)
err := db.Flatten(1)
fmt.Printf("Flatten done with error: %v\n", err)
}
}

for rid := range rids {
Expand Down
22 changes: 11 additions & 11 deletions raftwal/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (w *DiskStorage) hardStateKey() []byte {
return b
}

func (w *DiskStorage) entryKey(idx uint64) []byte {
func (w *DiskStorage) EntryKey(idx uint64) []byte {
b := make([]byte, 20)
binary.BigEndian.PutUint64(b[0:8], w.id)
binary.BigEndian.PutUint32(b[8:12], w.gid)
Expand Down Expand Up @@ -168,7 +168,7 @@ func (w *DiskStorage) seekEntry(e *pb.Entry, seekTo uint64, reverse bool) (uint6
itr := txn.NewIterator(opt)
defer itr.Close()

itr.Seek(w.entryKey(seekTo))
itr.Seek(w.EntryKey(seekTo))
if !itr.Valid() {
return errNotFound
}
Expand Down Expand Up @@ -240,7 +240,7 @@ func (w *DiskStorage) deleteUntil(batch *badger.WriteBatch, until uint64) error
itr := txn.NewIterator(opt)
defer itr.Close()

start := w.entryKey(0)
start := w.EntryKey(0)
first := true
var index uint64
for itr.Seek(start); itr.Valid(); itr.Next() {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (w *DiskStorage) setSnapshot(batch *badger.WriteBatch, s pb.Snapshot) error
if err != nil {
return err
}
if err := batch.Set(w.entryKey(e.Index), data, 0); err != nil {
if err := batch.Set(w.EntryKey(e.Index), data, 0); err != nil {
return err
}

Expand Down Expand Up @@ -358,7 +358,7 @@ func (w *DiskStorage) reset(es []pb.Entry) error {
if err != nil {
return x.Wrapf(err, "wal.Store: While marshal entry")
}
k := w.entryKey(e.Index)
k := w.EntryKey(e.Index)
if err := batch.Set(k, data, 0); err != nil {
return err
}
Expand All @@ -383,7 +383,7 @@ func (w *DiskStorage) deleteKeys(batch *badger.WriteBatch, keys []string) error
func (w *DiskStorage) deleteFrom(batch *badger.WriteBatch, from uint64) error {
var keys []string
err := w.db.View(func(txn *badger.Txn) error {
start := w.entryKey(from)
start := w.EntryKey(from)
opt := badger.DefaultIteratorOptions
opt.PrefetchValues = false
opt.Prefix = w.entryPrefix()
Expand Down Expand Up @@ -445,7 +445,7 @@ func (w *DiskStorage) NumEntries() (int, error) {
itr := txn.NewIterator(opt)
defer itr.Close()

start := w.entryKey(0)
start := w.EntryKey(0)
for itr.Seek(start); itr.Valid(); itr.Next() {
count++
}
Expand All @@ -457,7 +457,7 @@ func (w *DiskStorage) NumEntries() (int, error) {
func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr error) {
err := w.db.View(func(txn *badger.Txn) error {
if hi-lo == 1 { // We only need one entry.
item, err := txn.Get(w.entryKey(lo))
item, err := txn.Get(w.EntryKey(lo))
if err != nil {
return err
}
Expand All @@ -476,8 +476,8 @@ func (w *DiskStorage) allEntries(lo, hi, maxSize uint64) (es []pb.Entry, rerr er
itr := txn.NewIterator(iopt)
defer itr.Close()

start := w.entryKey(lo)
end := w.entryKey(hi) // Not included in results.
start := w.EntryKey(lo)
end := w.EntryKey(hi) // Not included in results.

var size, lastIndex uint64
first := true
Expand Down Expand Up @@ -617,7 +617,7 @@ func (w *DiskStorage) addEntries(batch *badger.WriteBatch, entries []pb.Entry) e
// firste can exceed last if Raft makes a jump.

for _, e := range entries {
k := w.entryKey(e.Index)
k := w.EntryKey(e.Index)
data, err := e.Marshal()
if err != nil {
return x.Wrapf(err, "wal.Append: While marshal entry")
Expand Down