Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lsbackup command to list backups. #3219

Merged
merged 8 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dgraph/cmd/root_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func init() {
// subcommands already has the default subcommands, we append to EE ones to that.
subcommands = append(subcommands,
&backup.Restore,
&backup.LsBackup,
&acl.CmdAcl,
)
}
8 changes: 8 additions & 0 deletions ee/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ type Manifest struct {
Groups []uint32 `json:"groups"`
}

// ManifestStatus combines a manifest along with other information about it
// that should not be inside the Manifest struct since it should not be
// recorded in manifest files.
type ManifestStatus struct {
*Manifest
FileName string
}

// GoString implements the GoStringer interface for Manifest.
func (m *Manifest) GoString() string {
return fmt.Sprintf(`Manifest{Version: %d, ReadTs: %d, Groups: %v}`,
Expand Down
6 changes: 3 additions & 3 deletions ee/backup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func RestoreFull(t *testing.T, c *dgo.Dgraph) {

// restore this backup dir (3 files total)
t.Logf("--- Restoring from: %q", dirs[0])
_, err := runRestore("./data/restore", dirs[0])
_, err := runRestoreInternal("./data/restore", dirs[0])
require.NoError(t, err)

// just check p1 which should have the 'movie' predicate (moved during setup)
Expand Down Expand Up @@ -277,7 +277,7 @@ func RestoreIncr1(t *testing.T, c *dgo.Dgraph) {

// restore this backup dir (3 files total)
t.Logf("--- Restoring from: %q", dirs[1])
_, err := runRestore("./data/restore", dirs[1])
_, err := runRestoreInternal("./data/restore", dirs[1])
require.NoError(t, err)

// just check p1 which should have the 'movie' predicate (moved during setup)
Expand Down Expand Up @@ -335,7 +335,7 @@ func RestoreIncr2(t *testing.T, c *dgo.Dgraph) {

// restore this backup dir (3 files total)
t.Logf("--- Restoring from: %q", dirs[2])
_, err := runRestore("./data/restore", dirs[2])
_, err := runRestoreInternal("./data/restore", dirs[2])
require.NoError(t, err)

// just check p1 which should have the 'movie' predicate (moved during setup)
Expand Down
27 changes: 26 additions & 1 deletion ee/backup/file_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
continue
}

// Load the backup for each group in manifest.
// Check the files for each group in the manifest exist.
path := filepath.Dir(manifest)
for _, groupId := range m.Groups {
file := filepath.Join(path, fmt.Sprintf(backupNameFmt, m.ReadTs, groupId))
Expand All @@ -155,6 +155,31 @@ func (h *fileHandler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return version, nil
}

// ListManifests loads the manifests in the locations and returns them.
func (h *fileHandler) ListManifests(uri *url.URL) ([]string, error) {
if !pathExist(uri.Path) {
return nil, x.Errorf("The path %q does not exist or it is inaccessible.", uri.Path)
}

// Get a list of all the manifest files at the location.
suffix := filepath.Join(string(filepath.Separator), backupManifest)
manifests := x.WalkPathFunc(uri.Path, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, suffix)
})
if len(manifests) == 0 {
return nil, x.Errorf("No manifests found at path: %s", uri.Path)
}
sort.Strings(manifests)
if glog.V(3) {
fmt.Printf("Found backup manifest(s): %v\n", manifests)
}
return manifests, nil
}

func (h *fileHandler) ReadManifest(path string, m *Manifest) error {
return h.readManifest(path, m)
}

func (h *fileHandler) Close() error {
if h.fp == nil {
return nil
Expand Down
43 changes: 43 additions & 0 deletions ee/backup/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ type handler interface {
// The loadFn receives the files as they are processed by a handler, to do the actual
// load to DB.
Load(*url.URL, loadFn) (uint64, error)

// ListManifests will scan the provided URI and return the paths to the manifests stored
// in that location.
//
// The URL object is parsed as described in `newHandler`.
ListManifests(*url.URL) ([]string, error)

// ReadManifest will read the manifest at the given location and load it into the given
// Manifest object.
ReadManifest(string, *Manifest) error
}

// getHandler returns a handler for the URI scheme.
Expand Down Expand Up @@ -148,3 +158,36 @@ func Load(l string, fn loadFn) (version uint64, err error) {

return h.Load(uri, fn)
}

// ListManifests scans location l for backup files and returns the list of manifests.
func ListManifests(l string) ([]*ManifestStatus, error) {
uri, err := url.Parse(l)
if err != nil {
return nil, err
}

h := getHandler(uri.Scheme)
if h == nil {
return nil, x.Errorf("Unsupported URI: %v", uri)
}

paths, err := h.ListManifests(uri)
if err != nil {
return nil, err
}

var listedManifests []*ManifestStatus
for _, path := range paths {
var m Manifest
var ms ManifestStatus

if err := h.ReadManifest(path, &m); err != nil {
return nil, x.Wrapf(err, "While reading %q", path)
}
ms.Manifest = &m
ms.FileName = path
listedManifests = append(listedManifests, &ms)
}

return listedManifests, nil
}
78 changes: 76 additions & 2 deletions ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,18 @@ import (
)

var Restore x.SubCommand
var LsBackup x.SubCommand

var opt struct {
location, pdir, zero string
}

func init() {
initRestore()
initBackupLs()
}

func initRestore() {
Restore.Cmd = &cobra.Command{
Use: "restore",
Short: "Run Dgraph (EE) Restore backup",
Expand Down Expand Up @@ -85,7 +91,7 @@ $ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
defer x.StartProfile(Restore.Conf).Stop()
if err := run(); err != nil {
if err := runRestoreCmd(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
Expand All @@ -102,7 +108,56 @@ $ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080
_ = Restore.Cmd.MarkFlagRequired("location")
}

func run() error {
func initBackupLs() {
LsBackup.Cmd = &cobra.Command{
Use: "lsbackup",
Short: "List info on backups in given location",
Long: `
lsbackup looks at a location where backups are stored and prints information about them.

Backups are originated from HTTP at /admin/backup, then can be restored using CLI restore
command. Restore is intended to be used with new Dgraph clusters in offline state.

The --location flag indicates a source URI with Dgraph backup objects. This URI supports all
the schemes used for backup.

Source URI formats:
[scheme]://[host]/[path]?[args]
[scheme]:///[path]?[args]
/[path]?[args] (only for local or NFS)

Source URI parts:
scheme - service handler, one of: "s3", "minio", "file"
host - remote address. ex: "dgraph.s3.amazonaws.com"
path - directory, bucket or container at target. ex: "/dgraph/backups/"
args - specific arguments that are ok to appear in logs.

Dgraph backup creates a unique backup object for each node group, and restore will create
a posting directory 'p' matching the backup group ID. Such that a backup file
named '.../r32-g2.backup' will be loaded to posting dir 'p2'.

Usage examples:

# Run using location in S3:
$ dgraph lsbackup -l s3://s3.us-west-2.amazonaws.com/srfrog/dgraph
`,
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
defer x.StartProfile(Restore.Conf).Stop()
if err := runLsbackupCmd(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
},
}

flag := LsBackup.Cmd.Flags()
flag.StringVarP(&opt.location, "location", "l", "",
"Sets the source location URI (required).")
_ = LsBackup.Cmd.MarkFlagRequired("location")
}

func runRestoreCmd() error {
var (
start time.Time
zc pb.ZeroClient
Expand Down Expand Up @@ -183,3 +238,22 @@ func runRestore(pdir, location string) (uint64, error) {
return db.Load(r)
})
}

func runLsbackupCmd() error {
fmt.Println("Listing backups from:", opt.location)
manifests, err := ListManifests(opt.location)
if err != nil {
return x.Errorf("Error while listing manifests: %v", err.Error())
}

fmt.Printf("Name\tVersion\tReadTs\tGroups\n")
for _, manifest := range manifests {
fmt.Printf("%v\t%v\t%v\t%v\n",
manifest.FileName,
manifest.Version,
manifest.ReadTs,
manifest.Groups)
}

return nil
}
38 changes: 38 additions & 0 deletions ee/backup/s3_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type s3Handler struct {
pwriter *io.PipeWriter
preader *io.PipeReader
cerr chan error
uri *url.URL
}

// setup creates a new session, checks valid bucket at uri.Path, and configures a minio client.
Expand Down Expand Up @@ -275,6 +276,43 @@ func (h *s3Handler) Load(uri *url.URL, fn loadFn) (uint64, error) {
return version, nil
}

// ListManifests loads the manifests in the locations and returns them.
func (h *s3Handler) ListManifests(uri *url.URL) ([]string, error) {
mc, err := h.setup(uri)
if err != nil {
return nil, err
}
h.uri = uri

var manifests []string
doneCh := make(chan struct{})
defer close(doneCh)

suffix := "/" + backupManifest
for object := range mc.ListObjects(h.bucketName, h.objectPrefix, true, doneCh) {
if strings.HasSuffix(object.Key, suffix) {
manifests = append(manifests, object.Key)
}
}
if len(manifests) == 0 {
return nil, x.Errorf("No manifests found at: %s", uri.String())
}
sort.Strings(manifests)
if glog.V(3) {
fmt.Printf("Found backup manifest(s) %s: %v\n", uri.Scheme, manifests)
}
return manifests, nil
}

func (h *s3Handler) ReadManifest(path string, m *Manifest) error {
mc, err := h.setup(h.uri)
if err != nil {
return err
}

return h.readManifest(mc, path, m)
}

// upload will block until it's done or an error occurs.
func (h *s3Handler) upload(mc *minio.Client, object string) error {
start := time.Now()
Expand Down