Skip to content

Commit

Permalink
Only backup the predicates belonging to a group. (#3621)
Browse files Browse the repository at this point in the history
This PR changes the backup request to include the list of predicates in
the group. This list is used by the ChooseKey function to skip any keys
not belonging to the specified predicates.
  • Loading branch information
martinmr authored Jul 8, 2019
1 parent a67fbfd commit 536cae7
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 242 deletions.
22 changes: 10 additions & 12 deletions dgraph/cmd/alpha/admin_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,17 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
if err := worker.UpdateMembershipState(ctx); err != nil {
return err
}

// Get the current membership state and parse it for easier processing.
state := worker.GetMembershipState()
var groups []uint32
for gid := range state.Groups {
predMap := make(map[uint32][]string)
for gid, group := range state.Groups {
groups = append(groups, gid)
predMap[gid] = make([]string, 0)
for pred := range group.Tablets {
predMap[gid] = append(predMap[gid], pred)
}
}

glog.Infof("Created backup request: %s. Groups=%v\n", &req, groups)
Expand All @@ -128,6 +135,7 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
for _, gid := range groups {
req := req
req.GroupId = gid
req.Predicates = predMap[gid]
go func(req *pb.BackupRequest) {
_, err := worker.BackupGroup(ctx, req)
errCh <- err
Expand All @@ -141,17 +149,7 @@ func processHttpBackupRequest(ctx context.Context, r *http.Request) error {
}
}

// Convert state into a map for writing into the manifest.
manifestGroups := make(map[uint32][]string)
for gid, group := range state.Groups {
var preds []string
for key := range group.Tablets {
preds = append(preds, key)
}
manifestGroups[gid] = preds
}

m := backup.Manifest{Since: req.ReadTs, Groups: manifestGroups}
m := backup.Manifest{Since: req.ReadTs, Groups: predMap}
if req.SinceTs == 0 {
m.Type = "full"
m.BackupId = x.GetRandomName(1)
Expand Down
13 changes: 12 additions & 1 deletion ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/dgraph-io/badger"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/x"
"github.com/golang/glog"
)

Expand All @@ -45,7 +46,7 @@ type Manifest struct {
// because it will become the timestamp from which to backup in the next
// incremental backup.
Since uint64 `json:"since"`
// Groups is the list of valid groups at the time the backup was created.
// Groups is the map of valid groups to predicates at the time the backup was created.
Groups map[uint32][]string `json:"groups"`
// BackupId is a unique ID assigned to all the backups in the same series
// (from the first full backup to the last incremental backup).
Expand Down Expand Up @@ -87,8 +88,18 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) {

glog.V(3).Infof("Backup manifest version: %d", pr.Request.SinceTs)

predMap := make(map[string]struct{})
for _, pred := range pr.Request.Predicates {
predMap[pred] = struct{}{}
}

stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
stream.ChooseKey = func(item *badger.Item) bool {
parsedKey := x.Parse(item.Key())
_, ok := predMap[parsedKey.Attr]
return ok
}
gzWriter := gzip.NewWriter(handler)
newSince, err := stream.Backup(gzWriter, pr.Request.SinceTs)

Expand Down
4 changes: 4 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,10 @@ message BackupRequest {
// True if no credentials should be used to access the S3 or minio bucket.
// For example, when using a bucket with a public policy.
bool anonymous = 9;

// The predicates to backup. All other predicates present in the group (e.g
// stale data from a predicate move) will be ignored.
repeated string predicates = 10;
}

message ExportRequest {
Expand Down
Loading

0 comments on commit 536cae7

Please sign in to comment.