From 20dcc597f019c011fe21443940a509600f48834b Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 9 Jul 2019 15:50:43 -0700 Subject: [PATCH 1/6] Use backwards-compatible formats during backup. This change converts the keys and posting lists to a backwards-compatible format so that backups work accross versions of Dgraph. The restore logic is also changed to convert the data back to the internal Dgraph formats. --- ee/backup/backup.go | 148 ++++++++++++++++++++++++++++++++++++++++-- ee/backup/restore.go | 150 +++++++++++++++++++++++++++++++++++++++++++ ee/backup/run.go | 33 ---------- 3 files changed, 292 insertions(+), 39 deletions(-) create mode 100644 ee/backup/restore.go diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 4b92b548dc5..b5a7a8bd467 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -13,17 +13,25 @@ package backup import ( + "bytes" "compress/gzip" "context" + "encoding/binary" + "encoding/hex" "encoding/json" "fmt" + "io" "net/url" "sync" "github.com/dgraph-io/badger" + bpb "github.com/dgraph-io/badger/pb" + "github.com/golang/glog" + "github.com/pkg/errors" + + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" - "github.com/golang/glog" ) // Processor handles the different stages of the backup process. @@ -93,24 +101,33 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) { predMap[pred] = struct{}{} } + var maxVersion uint64 + gzWriter := gzip.NewWriter(handler) stream := pr.DB.NewStreamAt(pr.Request.ReadTs) stream.LogPrefix = "Dgraph.Backup" + stream.KeyToList = toBackupList(pr.Request.SinceTs) stream.ChooseKey = func(item *badger.Item) bool { parsedKey := x.Parse(item.Key()) _, ok := predMap[parsedKey.Attr] return ok } - gzWriter := gzip.NewWriter(handler) - newSince, err := stream.Backup(gzWriter, pr.Request.SinceTs) + stream.Send = func(list *bpb.KVList) error { + for _, kv := range list.Kv { + if maxVersion < kv.Version { + maxVersion = kv.Version + } + } + return writeKVList(list, gzWriter) + } - if err != nil { + if err := stream.Orchestrate(context.Background()); err != nil { glog.Errorf("While taking backup: %v", err) return &emptyRes, err } - if newSince > pr.Request.ReadTs { + if maxVersion > pr.Request.ReadTs { glog.Errorf("Max timestamp seen during backup (%d) is greater than readTs (%d)", - newSince, pr.Request.ReadTs) + maxVersion, pr.Request.ReadTs) } glog.V(2).Infof("Backup group %d version: %d", pr.Request.GroupId, pr.Request.ReadTs) @@ -161,3 +178,122 @@ func (pr *Processor) CompleteBackup(ctx context.Context, manifest *Manifest) err func (m *Manifest) GoString() string { return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups) } + +func toBackupList(since uint64) func([]byte, *badger.Iterator) (*bpb.KVList, error) { + return func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { + list := &bpb.KVList{} + + loop: + for itr.Valid() { + item := itr.Item() + if !bytes.Equal(item.Key(), key) { + break + } + if item.Version() < since { + // Ignore versions less than given timestamp, or skip older versions of + // the given key. + break + } + + switch item.UserMeta() { + case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + l, err := posting.ReadPostingList(key, itr) + kvs, err := l.Rollup() + if err != nil { + return nil, errors.Wrapf(err, "while rolling up list") + } + + for _, kv := range kvs { + backupKey, err := toBackupKey(kv.Key) + if err != nil { + return nil, err + } + kv.Key = backupKey + + backupPl, err := toBackupPostingList(kv.Value) + if err != nil { + return nil, err + } + kv.Value = backupPl + } + list.Kv = append(list.Kv, kvs...) + + case posting.BitSchemaPosting: + var valCopy []byte + if !item.IsDeletedOrExpired() { + // No need to copy value if item is deleted or expired. + var err error + valCopy, err = item.ValueCopy(nil) + if err != nil { + return nil, errors.Wrapf(err, "while copying value") + } + } + + backupKey, err := toBackupKey(key) + if err != nil { + return nil, err + } + + kv := &bpb.KV{ + Key: backupKey, + Value: valCopy, + UserMeta: []byte{item.UserMeta()}, + Version: item.Version(), + ExpiresAt: item.ExpiresAt(), + } + list.Kv = append(list.Kv, kv) + + if item.DiscardEarlierVersions() || item.IsDeletedOrExpired() { + break loop + } + + // Manually advance the iterator. This cannot be done in the for + // statement because ReadPostingList advances the iterator so this + // only needs to be done for BitSchemaPosting entries. + itr.Next() + + default: + return nil, errors.Errorf( + "Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key)) + } + } + + return list, nil + } +} + +func toBackupKey(key []byte) ([]byte, error) { + parsedKey := x.Parse(key) + if parsedKey == nil { + return nil, errors.Errorf("could not parse key %s", hex.Dump(key)) + } + backupKey, err := parsedKey.ToBackupKey().Marshal() + if err != nil { + return nil, errors.Wrapf(err, "while converting key for backup") + } + return backupKey, nil +} + +func toBackupPostingList(val []byte) ([]byte, error) { + pl := &pb.PostingList{} + if err := pl.Unmarshal(val); err != nil { + return nil, errors.Wrapf(err, "while reading posting list") + } + backupVal, err := posting.ToBackupPostingList(pl).Marshal() + if err != nil { + return nil, errors.Wrapf(err, "while converting posting list for backup") + } + return backupVal, nil +} + +func writeKVList(list *bpb.KVList, w io.Writer) error { + if err := binary.Write(w, binary.LittleEndian, uint64(list.Size())); err != nil { + return err + } + buf, err := list.Marshal() + if err != nil { + return err + } + _, err = w.Write(buf) + return err +} diff --git a/ee/backup/restore.go b/ee/backup/restore.go new file mode 100644 index 00000000000..676d85b5927 --- /dev/null +++ b/ee/backup/restore.go @@ -0,0 +1,150 @@ +// +build !oss + +/* + * Copyright 2019 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Dgraph Community License (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + */ + +package backup + +import ( + "bufio" + "compress/gzip" + "encoding/binary" + "encoding/hex" + "fmt" + "io" + "math" + "path/filepath" + + "github.com/dgraph-io/badger" + "github.com/dgraph-io/badger/options" + bpb "github.com/dgraph-io/badger/pb" + "github.com/pkg/errors" + + "github.com/dgraph-io/dgraph/posting" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" +) + +// RunRestore calls badger.Load and tries to load data into a new DB. +func RunRestore(pdir, location, backupId string) (uint64, error) { + // Scan location for backup files and load them. Each file represents a node group, + // and we create a new p dir for each. + return Load(location, backupId, func(r io.Reader, groupId int) error { + dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) + db, err := badger.OpenManaged(badger.DefaultOptions(dir). + WithSyncWrites(true). + WithTableLoadingMode(options.MemoryMap). + WithValueThreshold(1 << 10). + WithNumVersionsToKeep(math.MaxInt32)) + if err != nil { + return err + } + defer db.Close() + fmt.Printf("Restoring groupId: %d\n", groupId) + if !pathExist(dir) { + fmt.Println("Creating new db:", dir) + } + gzReader, err := gzip.NewReader(r) + if err != nil { + return nil + } + return loadFromBackup(db, gzReader, 16) + }) +} + +// loadFromBackup reads the backup, converts the keys and values to the required format, +// and loads them to the given badger DB. +func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { + br := bufio.NewReaderSize(r, 16<<10) + unmarshalBuf := make([]byte, 1<<10) + + loader := db.NewKVLoader(maxPendingWrites) + for { + var sz uint64 + err := binary.Read(br, binary.LittleEndian, &sz) + if err == io.EOF { + break + } else if err != nil { + return err + } + + if cap(unmarshalBuf) < int(sz) { + unmarshalBuf = make([]byte, sz) + } + + if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil { + return err + } + + list := &bpb.KVList{} + if err := list.Unmarshal(unmarshalBuf[:sz]); err != nil { + return err + } + + for _, kv := range list.Kv { + if len(kv.GetUserMeta()) != 1 { + return errors.Errorf( + "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) + } + + var restoreKey []byte + var restoreVal []byte + switch kv.GetUserMeta()[0] { + case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + var err error + restoreKey, err = fromBackupKey(kv.Key) + if err != nil { + return err + } + + backupPl := &pb.BackupPostingList{} + if err := backupPl.Unmarshal(kv.Value); err != nil { + return errors.Wrapf(err, "while reading backup posting list") + } + restoreVal, err = posting.FromBackupPostingList(backupPl).Marshal() + if err != nil { + return errors.Wrapf(err, "while converting backup posting list") + } + + case posting.BitSchemaPosting: + var err error + restoreKey, err = fromBackupKey(kv.Key) + if err != nil { + return err + } + restoreVal = kv.Value + + default: + return errors.Errorf( + "Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key)) + } + + kv.Key = restoreKey + kv.Value = restoreVal + if err := loader.Set(kv); err != nil { + return err + } + } + } + + if err := loader.Finish(); err != nil { + return err + } + + return nil +} + +func fromBackupKey(key []byte) ([]byte, error) { + backupKey := &pb.BackupKey{} + if err := backupKey.Unmarshal(key); err != nil { + return nil, errors.Wrapf(err, "while reading backup key %s", hex.Dump(key)) + } + return x.FromBackupKey(backupKey), nil +} diff --git a/ee/backup/run.go b/ee/backup/run.go index 3e2fc3daf66..d348f77ec9a 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -13,17 +13,11 @@ package backup import ( - "compress/gzip" "context" "fmt" - "io" - "math" "os" - "path/filepath" "time" - "github.com/dgraph-io/badger" - "github.com/dgraph-io/badger/options" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" "github.com/pkg/errors" @@ -213,33 +207,6 @@ func runRestoreCmd() error { return nil } -// RunRestore calls badger.Load and tries to load data into a new DB. -func RunRestore(pdir, location, backupId string) (uint64, error) { - // Scan location for backup files and load them. Each file represents a node group, - // and we create a new p dir for each. - return Load(location, backupId, func(r io.Reader, groupId int) error { - dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) - db, err := badger.OpenManaged(badger.DefaultOptions(dir). - WithSyncWrites(true). - WithTableLoadingMode(options.MemoryMap). - WithValueThreshold(1 << 10). - WithNumVersionsToKeep(math.MaxInt32)) - if err != nil { - return err - } - defer db.Close() - fmt.Printf("Restoring groupId: %d\n", groupId) - if !pathExist(dir) { - fmt.Println("Creating new db:", dir) - } - gzReader, err := gzip.NewReader(r) - if err != nil { - return nil - } - return db.Load(gzReader, 16) - }) -} - func runLsbackupCmd() error { fmt.Println("Listing backups from:", opt.location) manifests, err := ListManifests(opt.location) From 5c5c0eaa64865376c195c5ae9732700051d69acd Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Tue, 9 Jul 2019 15:55:33 -0700 Subject: [PATCH 2/6] fix ineffassing warning. --- ee/backup/backup.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ee/backup/backup.go b/ee/backup/backup.go index b5a7a8bd467..563a20d5d48 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -198,6 +198,9 @@ func toBackupList(since uint64) func([]byte, *badger.Iterator) (*bpb.KVList, err switch item.UserMeta() { case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: l, err := posting.ReadPostingList(key, itr) + if err != nil { + return nil, errors.Wrapf(err, "while reading posting list") + } kvs, err := l.Rollup() if err != nil { return nil, errors.Wrapf(err, "while rolling up list") From 13ec89f71df2fafaa26446d656dc811dc189efbe Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 10 Jul 2019 13:54:12 -0700 Subject: [PATCH 3/6] simplify restore logic. --- ee/backup/restore.go | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/ee/backup/restore.go b/ee/backup/restore.go index 676d85b5927..5f9db16bfdc 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -94,16 +94,15 @@ func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) } - var restoreKey []byte + restoreKey, err := fromBackupKey(kv.Key) + if err != nil { + return err + } + var restoreVal []byte switch kv.GetUserMeta()[0] { case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: var err error - restoreKey, err = fromBackupKey(kv.Key) - if err != nil { - return err - } - backupPl := &pb.BackupPostingList{} if err := backupPl.Unmarshal(kv.Value); err != nil { return errors.Wrapf(err, "while reading backup posting list") @@ -114,11 +113,6 @@ func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { } case posting.BitSchemaPosting: - var err error - restoreKey, err = fromBackupKey(kv.Key) - if err != nil { - return err - } restoreVal = kv.Value default: From 2ce9fad0ab28c4433f049d1acd1d62b08d5b279f Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Wed, 10 Jul 2019 14:53:01 -0700 Subject: [PATCH 4/6] Use manifest to only restore preds assigned to each group. This PR changes the restore logic so that only the keys corresponding to the predicates that were assigned to the group at the time of the last backup are restored. This change is intended to properly deal with predicate moves. --- ee/backup/backup.go | 13 +++++++++++++ ee/backup/file_handler.go | 10 +++++++--- ee/backup/handler.go | 8 ++++++-- ee/backup/restore.go | 17 ++++++++++++++--- ee/backup/s3_handler.go | 11 ++++++++--- 5 files changed, 48 insertions(+), 11 deletions(-) diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 563a20d5d48..1ff592f95e7 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -69,6 +69,19 @@ type Manifest struct { Path string `json:"-"` } +func (m *Manifest) getPredsInGroup(gid uint32) predicateSet { + preds, ok := m.Groups[gid] + if !ok { + return nil + } + + predSet := make(predicateSet) + for _, pred := range preds { + predSet[pred] = struct{}{} + } + return predSet +} + // WriteBackup uses the request values to create a stream writer then hand off the data // retrieval to stream.Orchestrate. The writer will create all the fd's needed to // collect the data and later move to the target. diff --git a/ee/backup/file_handler.go b/ee/backup/file_handler.go index fb8b12debfd..ce3836691ac 100644 --- a/ee/backup/file_handler.go +++ b/ee/backup/file_handler.go @@ -157,14 +157,18 @@ func (h *fileHandler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, er } path := filepath.Dir(manifests[i].Path) - for groupId := range manifest.Groups { - file := filepath.Join(path, backupName(manifest.Since, groupId)) + for gid := range manifest.Groups { + file := filepath.Join(path, backupName(manifest.Since, gid)) fp, err := os.Open(file) if err != nil { return 0, errors.Wrapf(err, "Failed to open %q", file) } defer fp.Close() - if err = fn(fp, int(groupId)); err != nil { + + // Only restore the predicates that were assigned to this group at the time + // of the last backup. + predSet := manifests[len(manifests)-1].getPredsInGroup(gid) + if err = fn(fp, int(gid), predSet); err != nil { return 0, err } } diff --git a/ee/backup/handler.go b/ee/backup/handler.go index e13f59198fd..bb5d0fe57e2 100644 --- a/ee/backup/handler.go +++ b/ee/backup/handler.go @@ -128,9 +128,13 @@ func NewUriHandler(uri *url.URL) (UriHandler, error) { return h, nil } +// predicateSet is a map whose keys are predicates. It is meant to be used as a set. +type predicateSet map[string]struct{} + // loadFn is a function that will receive the current file being read. -// A reader and the backup groupId are passed as arguments. -type loadFn func(reader io.Reader, groupId int) error +// A reader, the backup groupId, and a map whose keys are the predicates to restore +// are passed as arguments. +type loadFn func(reader io.Reader, groupId int, preds predicateSet) error // Load will scan location l for backup files in the given backup series and load them // sequentially. Returns the maximum Since value on success, otherwise an error. diff --git a/ee/backup/restore.go b/ee/backup/restore.go index 5f9db16bfdc..8e2d42d76dd 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -36,7 +36,7 @@ import ( func RunRestore(pdir, location, backupId string) (uint64, error) { // Scan location for backup files and load them. Each file represents a node group, // and we create a new p dir for each. - return Load(location, backupId, func(r io.Reader, groupId int) error { + return Load(location, backupId, func(r io.Reader, groupId int, preds predicateSet) error { dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) db, err := badger.OpenManaged(badger.DefaultOptions(dir). WithSyncWrites(true). @@ -55,13 +55,13 @@ func RunRestore(pdir, location, backupId string) (uint64, error) { if err != nil { return nil } - return loadFromBackup(db, gzReader, 16) + return loadFromBackup(db, gzReader, 16, preds) }) } // loadFromBackup reads the backup, converts the keys and values to the required format, // and loads them to the given badger DB. -func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { +func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int, preds predicateSet) error { br := bufio.NewReaderSize(r, 16<<10) unmarshalBuf := make([]byte, 1<<10) @@ -99,6 +99,17 @@ func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int) error { return err } + // Filter keys using the preds set. Do not do this filtering for type keys + // as they are meant to be in every group and their Attr value does not + // match a predicate name. + parsedKey := x.Parse(restoreKey) + if parsedKey == nil { + return errors.Errorf("could not parse key %s", hex.Dump(restoreKey)) + } + if _, ok := preds[parsedKey.Attr]; !parsedKey.IsType() && !ok { + continue + } + var restoreVal []byte switch kv.GetUserMeta()[0] { case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: diff --git a/ee/backup/s3_handler.go b/ee/backup/s3_handler.go index b8912073b31..638db33d0ea 100644 --- a/ee/backup/s3_handler.go +++ b/ee/backup/s3_handler.go @@ -295,13 +295,14 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, erro } path := filepath.Dir(manifests[i].Path) - for groupId := range manifest.Groups { - object := filepath.Join(path, backupName(manifest.Since, groupId)) + for gid := range manifest.Groups { + object := filepath.Join(path, backupName(manifest.Since, gid)) reader, err := mc.GetObject(h.bucketName, object, minio.GetObjectOptions{}) if err != nil { return 0, errors.Wrapf(err, "Failed to get %q", object) } defer reader.Close() + st, err := reader.Stat() if err != nil { return 0, errors.Wrapf(err, "Stat failed %q", object) @@ -310,7 +311,11 @@ func (h *s3Handler) Load(uri *url.URL, backupId string, fn loadFn) (uint64, erro return 0, errors.Errorf("Remote object is empty or inaccessible: %s", object) } fmt.Printf("Downloading %q, %d bytes\n", object, st.Size) - if err = fn(reader, int(groupId)); err != nil { + + // Only restore the predicates that were assigned to this group at the time + // of the last backup. + predSet := manifests[len(manifests)-1].getPredsInGroup(gid) + if err = fn(reader, int(gid), predSet); err != nil { return 0, err } } From 65393f42edb162f489413254270a57d3d5bceff5 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 11 Jul 2019 14:38:07 -0700 Subject: [PATCH 5/6] Addressed review comments. --- ee/backup/backup.go | 135 +++++++++++++++++++++---------------------- ee/backup/restore.go | 2 +- 2 files changed, 68 insertions(+), 69 deletions(-) diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 563a20d5d48..28652ff542c 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -105,7 +105,7 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) { gzWriter := gzip.NewWriter(handler) stream := pr.DB.NewStreamAt(pr.Request.ReadTs) stream.LogPrefix = "Dgraph.Backup" - stream.KeyToList = toBackupList(pr.Request.SinceTs) + stream.KeyToList = pr.toBackupList stream.ChooseKey = func(item *badger.Item) bool { parsedKey := x.Parse(item.Key()) _, ok := predMap[parsedKey.Attr] @@ -179,90 +179,89 @@ func (m *Manifest) GoString() string { return fmt.Sprintf(`Manifest{Since: %d, Groups: %v}`, m.Since, m.Groups) } -func toBackupList(since uint64) func([]byte, *badger.Iterator) (*bpb.KVList, error) { - return func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { - list := &bpb.KVList{} +func (pr *Processor) toBackupList(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { + list := &bpb.KVList{} - loop: - for itr.Valid() { - item := itr.Item() - if !bytes.Equal(item.Key(), key) { - break + for itr.Valid() { + item := itr.Item() + if !bytes.Equal(item.Key(), key) { + return list, nil + } + if item.Version() < pr.Request.SinceTs { + // Ignore versions less than given timestamp, or skip older versions of + // the given key. + return list, nil + } + + switch item.UserMeta() { + case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: + l, err := posting.ReadPostingList(key, itr) + if err != nil { + return nil, errors.Wrapf(err, "while reading posting list") } - if item.Version() < since { - // Ignore versions less than given timestamp, or skip older versions of - // the given key. - break + kvs, err := l.Rollup() + if err != nil { + return nil, errors.Wrapf(err, "while rolling up list") } - switch item.UserMeta() { - case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: - l, err := posting.ReadPostingList(key, itr) - if err != nil { - return nil, errors.Wrapf(err, "while reading posting list") - } - kvs, err := l.Rollup() + for _, kv := range kvs { + backupKey, err := toBackupKey(kv.Key) if err != nil { - return nil, errors.Wrapf(err, "while rolling up list") - } - - for _, kv := range kvs { - backupKey, err := toBackupKey(kv.Key) - if err != nil { - return nil, err - } - kv.Key = backupKey - - backupPl, err := toBackupPostingList(kv.Value) - if err != nil { - return nil, err - } - kv.Value = backupPl - } - list.Kv = append(list.Kv, kvs...) - - case posting.BitSchemaPosting: - var valCopy []byte - if !item.IsDeletedOrExpired() { - // No need to copy value if item is deleted or expired. - var err error - valCopy, err = item.ValueCopy(nil) - if err != nil { - return nil, errors.Wrapf(err, "while copying value") - } + return nil, err } + kv.Key = backupKey - backupKey, err := toBackupKey(key) + backupPl, err := toBackupPostingList(kv.Value) if err != nil { return nil, err } - - kv := &bpb.KV{ - Key: backupKey, - Value: valCopy, - UserMeta: []byte{item.UserMeta()}, - Version: item.Version(), - ExpiresAt: item.ExpiresAt(), + kv.Value = backupPl + } + list.Kv = append(list.Kv, kvs...) + + case posting.BitSchemaPosting: + var valCopy []byte + if !item.IsDeletedOrExpired() { + // No need to copy value if item is deleted or expired. + var err error + valCopy, err = item.ValueCopy(nil) + if err != nil { + return nil, errors.Wrapf(err, "while copying value") } - list.Kv = append(list.Kv, kv) + } - if item.DiscardEarlierVersions() || item.IsDeletedOrExpired() { - break loop - } + backupKey, err := toBackupKey(key) + if err != nil { + return nil, err + } - // Manually advance the iterator. This cannot be done in the for - // statement because ReadPostingList advances the iterator so this - // only needs to be done for BitSchemaPosting entries. - itr.Next() + kv := &bpb.KV{ + Key: backupKey, + Value: valCopy, + UserMeta: []byte{item.UserMeta()}, + Version: item.Version(), + ExpiresAt: item.ExpiresAt(), + } + list.Kv = append(list.Kv, kv) - default: - return nil, errors.Errorf( - "Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key)) + if item.DiscardEarlierVersions() || item.IsDeletedOrExpired() { + return list, nil } - } - return list, nil + // Manually advance the iterator. This cannot be done in the for + // statement because ReadPostingList advances the iterator so this + // only needs to be done for BitSchemaPosting entries. + itr.Next() + + default: + return nil, errors.Errorf( + "Unexpected meta: %d for key: %s", item.UserMeta(), hex.Dump(key)) + } } + + // This shouldn't be reached but it's being added here because the golang + // compiler complains about the missing return statement. + return list, nil } func toBackupKey(key []byte) ([]byte, error) { diff --git a/ee/backup/restore.go b/ee/backup/restore.go index 5f9db16bfdc..225340199e4 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -39,7 +39,7 @@ func RunRestore(pdir, location, backupId string) (uint64, error) { return Load(location, backupId, func(r io.Reader, groupId int) error { dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) db, err := badger.OpenManaged(badger.DefaultOptions(dir). - WithSyncWrites(true). + WithSyncWrites(false). WithTableLoadingMode(options.MemoryMap). WithValueThreshold(1 << 10). WithNumVersionsToKeep(math.MaxInt32)) From 7a2819317a2bf9d81d2427b1e9fb1830e6204d56 Mon Sep 17 00:00:00 2001 From: Martin Martinez Rivera Date: Thu, 11 Jul 2019 14:51:24 -0700 Subject: [PATCH 6/6] Address reviews comments. --- ee/backup/restore.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ee/backup/restore.go b/ee/backup/restore.go index 018c1530d8b..9e8afe13473 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -55,17 +55,17 @@ func RunRestore(pdir, location, backupId string) (uint64, error) { if err != nil { return nil } - return loadFromBackup(db, gzReader, 16, preds) + return loadFromBackup(db, gzReader, preds) }) } // loadFromBackup reads the backup, converts the keys and values to the required format, // and loads them to the given badger DB. -func loadFromBackup(db *badger.DB, r io.Reader, maxPendingWrites int, preds predicateSet) error { +func loadFromBackup(db *badger.DB, r io.Reader, preds predicateSet) error { br := bufio.NewReaderSize(r, 16<<10) unmarshalBuf := make([]byte, 1<<10) - loader := db.NewKVLoader(maxPendingWrites) + loader := db.NewKVLoader(16) for { var sz uint64 err := binary.Read(br, binary.LittleEndian, &sz)