Skip to content

Commit

Permalink
Fixed race bugs. Added JSON output option. Added filters for listing …
Browse files Browse the repository at this point in the history
…backups. More tests.
  • Loading branch information
someone1 committed Sep 8, 2017
1 parent 5a7aad1 commit b7d04a4
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 65 deletions.
1 change: 1 addition & 0 deletions backends/gcs_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func (g *GoogleCloudStorageBackend) Upload(ctx context.Context, vol *helpers.Vol
if _, err := io.Copy(w, vol); err != nil {
w.Close()
helpers.AppLogger.Debugf("gs backend: Error while uploading volume %s - %v", vol.ObjectName, err)
return err
}
return w.Close()

Expand Down
34 changes: 30 additions & 4 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import (
)

var (
ErrNoOp = errors.New("nothing new to sync")
ErrNoOp = errors.New("nothing new to sync")
manifestmutex sync.Mutex
)

// ProcessSmartOptions will compute the snapshots to use
Expand Down Expand Up @@ -315,7 +316,9 @@ func Backup(pctx context.Context, jobInfo *helpers.JobInfo) error {
if !vol.IsManifest {
helpers.AppLogger.Debugf("Volume %s has finished the entire pipeline.", vol.ObjectName)
helpers.AppLogger.Debugf("Adding %s to the manifest volume list.", vol.ObjectName)
manifestmutex.Lock()
jobInfo.Volumes = append(jobInfo.Volumes, vol)
manifestmutex.Unlock()
// Write a manifest file and save it locally in order to resume later
manifestVol, err := saveManifest(ctx, jobInfo, false)
if err != nil {
Expand Down Expand Up @@ -347,8 +350,9 @@ func Backup(pctx context.Context, jobInfo *helpers.JobInfo) error {
// TODO: How to incorporate contexts in this go routine?
maniwg.Wait() // Wait until the ZFS send command has completed and all volumes have been uploaded to all backends.
helpers.AppLogger.Infof("All volumes dispatched in pipeline, finalizing manifest file.")

manifestmutex.Lock()
jobInfo.EndTime = time.Now()
manifestmutex.Unlock()
manifestVol, err := saveManifest(ctx, jobInfo, true)
if err != nil {
return err
Expand All @@ -364,7 +368,21 @@ func Backup(pctx context.Context, jobInfo *helpers.JobInfo) error {
}

totalWrittenBytes := jobInfo.TotalBytesWritten()
helpers.AppLogger.Noticef("Done.\n\tTotal ZFS Stream Bytes: %d (%s)\n\tTotal Bytes Written: %d (%s)\n\tElapsed Time: %v\n\tTotal Files Uploaded: %d", jobInfo.ZFSStreamBytes, humanize.IBytes(jobInfo.ZFSStreamBytes), totalWrittenBytes, humanize.IBytes(totalWrittenBytes), time.Since(jobInfo.StartTime), len(jobInfo.Volumes)+1)
if helpers.JSONOutput {
var doneOutput = struct {
TotalZFSBytes uint64
TotalBackupBytes uint64
ElapsedTime time.Duration
FilesUploaded int
}{jobInfo.ZFSStreamBytes, totalWrittenBytes, time.Since(jobInfo.StartTime), len(jobInfo.Volumes) + 1}
if j, jerr := json.Marshal(doneOutput); jerr != nil {
helpers.AppLogger.Errorf("could not ouput json due to error - %v", jerr)
} else {
fmt.Fprintf(helpers.Stdout, "%s", string(j))
}
} else {
fmt.Fprintf(helpers.Stdout, "Done.\n\tTotal ZFS Stream Bytes: %d (%s)\n\tTotal Bytes Written: %d (%s)\n\tElapsed Time: %v\n\tTotal Files Uploaded: %d", jobInfo.ZFSStreamBytes, humanize.IBytes(jobInfo.ZFSStreamBytes), totalWrittenBytes, humanize.IBytes(totalWrittenBytes), time.Since(jobInfo.StartTime), len(jobInfo.Volumes)+1)
}

helpers.AppLogger.Debugf("Cleaning up resources...")

Expand All @@ -378,6 +396,8 @@ func Backup(pctx context.Context, jobInfo *helpers.JobInfo) error {
}

func saveManifest(ctx context.Context, j *helpers.JobInfo, final bool) (*helpers.VolumeInfo, error) {
manifestmutex.Lock()
defer manifestmutex.Unlock()
sort.Sort(helpers.ByVolumeNumber(j.Volumes))

// Setup Manifest File
Expand Down Expand Up @@ -435,7 +455,7 @@ func sendStream(ctx context.Context, j *helpers.JobInfo, c chan<- *helpers.Volum
skipBytes, volNum := j.TotalBytesStreamedAndVols()
lastTotalBytes = skipBytes
for {
// Skipy byes if we are resuming
// Skip bytes if we are resuming
if skipBytes > 0 {
helpers.AppLogger.Debugf("Want to skip %d bytes.", skipBytes)
written, serr := io.CopyN(ioutil.Discard, counter, int64(skipBytes))
Expand Down Expand Up @@ -525,7 +545,9 @@ func sendStream(ctx context.Context, j *helpers.JobInfo, c chan<- *helpers.Volum
}
}()

manifestmutex.Lock()
j.ZFSCommandLine = strings.Join(cmd.Args, " ")
manifestmutex.Unlock()
// Wait for the command to finish

err = group.Wait()
Expand All @@ -534,7 +556,9 @@ func sendStream(ctx context.Context, j *helpers.JobInfo, c chan<- *helpers.Volum
return err
}
helpers.AppLogger.Infof("zfs send completed without error")
manifestmutex.Lock()
j.ZFSStreamBytes = counter.Count()
manifestmutex.Unlock()
return nil
}

Expand Down Expand Up @@ -584,8 +608,10 @@ func tryResume(ctx context.Context, j *helpers.JobInfo) error {
return fmt.Errorf("option mismatch")
}

manifestmutex.Lock()
j.Volumes = originalManifest.Volumes
j.StartTime = originalManifest.StartTime
manifestmutex.Unlock()
helpers.AppLogger.Infof("Will be resuming previous backup attempt.")
}
return nil
Expand Down
72 changes: 55 additions & 17 deletions backup/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"path/filepath"
"sort"
"strings"
"time"

"github.com/someone1/zfsbackup-go/helpers"
)
Expand All @@ -36,7 +37,7 @@ import (
// and then read and output the manifest information describing the backup sets
// found in the target destination.
// TODO: Group by volume name?
func List(pctx context.Context, jobInfo *helpers.JobInfo) error {
func List(pctx context.Context, jobInfo *helpers.JobInfo, startswith string, before, after time.Time) error {
ctx, cancel := context.WithCancel(pctx)
defer cancel()

Expand Down Expand Up @@ -68,28 +69,65 @@ func List(pctx context.Context, jobInfo *helpers.JobInfo) error {
return derr
}

var output []string
output = append(output, fmt.Sprintf("Found %d backup sets:\n", len(decodedManifests)))
// Filter Manifests to only results we care about
filteredResults := decodedManifests[:0]
for _, manifest := range decodedManifests {
output = append(output, manifest.String())
}

if len(localOnlyFiles) > 0 {
output = append(output, fmt.Sprintf("There are %d manifests found locally that are not on the target destination.", len(localOnlyFiles)))
localOnlyOuput := []string{"The following manifests were found locally and can be removed using the clean command."}
for _, filename := range localOnlyFiles {
manifestPath := filepath.Join(localCachePath, filename)
decodedManifest, derr := readManifest(ctx, manifestPath, jobInfo)
if derr != nil {
helpers.AppLogger.Warningf("Could not read local only manifest %s due to error %v", manifestPath, derr)
if startswith != "" {
if startswith[len(startswith)-1:] == "*" {
if len(startswith) != 1 && !strings.HasPrefix(manifest.VolumeName, startswith[:len(startswith)-1]) {
continue
}
} else if strings.Compare(startswith, manifest.VolumeName) != 0 {
continue
}
localOnlyOuput = append(localOnlyOuput, decodedManifest.String())
}
helpers.AppLogger.Infof(strings.Join(localOnlyOuput, "\n"))

if !before.IsZero() && !manifest.BaseSnapshot.CreationTime.Before(before) {
continue
}

if !after.IsZero() && !manifest.BaseSnapshot.CreationTime.After(after) {
continue
}

filteredResults = append(filteredResults, manifest)
}

helpers.AppLogger.Noticef(strings.Join(output, "\n"))
decodedManifests = filteredResults

if !helpers.JSONOutput {
var output []string

output = append(output, fmt.Sprintf("Found %d backup sets:\n", len(decodedManifests)))
for _, manifest := range decodedManifests {
output = append(output, manifest.String())
}

if len(localOnlyFiles) > 0 {
output = append(output, fmt.Sprintf("There are %d manifests found locally that are not on the target destination.", len(localOnlyFiles)))
localOnlyOuput := []string{"The following manifests were found locally and can be removed using the clean command."}
for _, filename := range localOnlyFiles {
manifestPath := filepath.Join(localCachePath, filename)
decodedManifest, derr := readManifest(ctx, manifestPath, jobInfo)
if derr != nil {
helpers.AppLogger.Warningf("Could not read local only manifest %s due to error %v", manifestPath, derr)
continue
}
localOnlyOuput = append(localOnlyOuput, decodedManifest.String())
}
helpers.AppLogger.Infof(strings.Join(localOnlyOuput, "\n"))
}
fmt.Fprintln(helpers.Stdout, strings.Join(output, "\n"))
} else {
organizedManifests := linkManifests(decodedManifests)
j, jerr := json.Marshal(organizedManifests)
if jerr != nil {
helpers.AppLogger.Errorf("could not marshal results to JSON - %v", jerr)
return jerr
}

fmt.Fprintln(helpers.Stdout, string(j))
}

return nil
}
Expand Down
18 changes: 12 additions & 6 deletions backup/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,11 @@ func Receive(pctx context.Context, jobInfo *helpers.JobInfo) error {
retryconf := backoff.WithContext(be, ctx)

operation := func() error {
return processSequence(ctx, sequence, backend, usePipe)
oerr := processSequence(ctx, sequence, backend, usePipe)
if oerr != nil {
helpers.AppLogger.Warningf("error trying to download file %s - %v", sequence.volume.ObjectName, oerr)
}
return oerr
}

helpers.AppLogger.Debugf("Downloading volume %s.", sequence.volume.ObjectName)
Expand Down Expand Up @@ -392,7 +396,6 @@ func processSequence(ctx context.Context, sequence downloadSequence, backend bac
return err
}

defer vol.Close()
vol.ObjectName = sequence.volume.ObjectName
if usePipe {
sequence.c <- vol
Expand All @@ -408,7 +411,10 @@ func processSequence(ctx context.Context, sequence downloadSequence, backend bac
}
return err
}
vol.Close()
if cerr := vol.Close(); cerr != nil {
helpers.AppLogger.Noticef("Could not close temporary file to download %s due to error - %v.", sequence.volume.ObjectName, cerr)
return cerr
}

// Verify the SHA256 Hash, if it doesn't match, ditch it!
if vol.SHA256Sum != sequence.volume.SHA256Sum {
Expand All @@ -419,10 +425,11 @@ func processSequence(ctx context.Context, sequence downloadSequence, backend bac
vol.DeleteVolume()
return fmt.Errorf("SHA256 hash mismatch for %s, got %s but expected %s", sequence.volume.ObjectName, vol.SHA256Sum, sequence.volume.SHA256Sum)
}
helpers.AppLogger.Debugf("Downloaded %s.", sequence.volume.ObjectName)

if !usePipe {
sequence.c <- vol
}
helpers.AppLogger.Debugf("Downloaded %s.", sequence.volume.ObjectName)

return nil
}
Expand Down Expand Up @@ -461,7 +468,6 @@ func receiveStream(ctx context.Context, cmd *exec.Cmd, j *helpers.JobInfo, c <-c
// Extract ZFS stream from files and send it to the zfs command
group.Go(func() error {
defer once.Do(func() { cout.Close() })
buf := make([]byte, 1024*1024)
for {
select {
case vol, ok := <-c:
Expand All @@ -474,7 +480,7 @@ func receiveStream(ctx context.Context, cmd *exec.Cmd, j *helpers.JobInfo, c <-c
helpers.AppLogger.Errorf("Error while trying to read from volume %s - %v", vol.ObjectName, eerr)
return err
}
_, eerr = io.CopyBuffer(cout, vol, buf)
_, eerr = io.Copy(cout, vol)
if eerr != nil {
helpers.AppLogger.Errorf("Error while trying to read from volume %s - %v", vol.ObjectName, eerr)
return eerr
Expand Down
60 changes: 59 additions & 1 deletion cmd/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ package cmd

import (
"context"
"time"

"github.com/spf13/cobra"

"github.com/someone1/zfsbackup-go/backup"
"github.com/someone1/zfsbackup-go/helpers"
)

var (
startsWith string
beforeStr string
afterStr string
before time.Time
after time.Time
)

// listCmd represents the list command
Expand All @@ -35,19 +45,67 @@ var listCmd = &cobra.Command{
Long: `List all backup sets found at the provided target.`,
PreRunE: validateListFlags,
RunE: func(cmd *cobra.Command, args []string) error {
if startsWith != "" {
if startsWith[len(startsWith)-1:] == "*" {
helpers.AppLogger.Infof("Listing all backup jobs for volumes starting with %s", startsWith)
} else {
helpers.AppLogger.Infof("Listing all backup jobs for volume %s", startsWith)
}
}

if !before.IsZero() {
helpers.AppLogger.Infof("Listing all back jobs of snapshots taken before %v", before)
}

if !after.IsZero() {
helpers.AppLogger.Infof("Listing all back jobs of snapshots taken after %v", after)
}

jobInfo.Destinations = []string{args[0]}
return backup.List(context.Background(), &jobInfo)
return backup.List(context.Background(), &jobInfo, startsWith, before, after)
},
}

func init() {
RootCmd.AddCommand(listCmd)

listCmd.Flags().StringVar(&startsWith, "volumeName", "", "Filter results to only this volume name, can end with a '*' to match as only a prefix")
listCmd.Flags().StringVar(&beforeStr, "before", "", "Filter results to only this backups before this specified date & time (format: yyyy-MM-ddTHH:mm:ss, parsed in local TZ)")
listCmd.Flags().StringVar(&afterStr, "after", "", "Filter results to only this backups after this specified date & time (format: yyyy-MM-ddTHH:mm:ss, parsed in local TZ)")
}

func validateListFlags(cmd *cobra.Command, args []string) error {
if len(args) != 1 {
cmd.Usage()
return errInvalidInput
}

if beforeStr != "" {
parsed, perr := time.ParseInLocation(time.RFC3339[:19], beforeStr, time.Local)
if perr != nil {
helpers.AppLogger.Errorf("could not parse before time '%s' due to error: %v", beforeStr, perr)
return perr
}
before = parsed
}

if afterStr != "" {
parsed, perr := time.ParseInLocation(time.RFC3339[:19], afterStr, time.Local)
if perr != nil {
helpers.AppLogger.Errorf("could not parse before time '%s' due to error: %v", beforeStr, perr)
return perr
}
after = parsed
}
return nil
}

// ResetListJobInfo exists solely for integration testing
func ResetListJobInfo() {
resetRootFlags()
startsWith = ""
beforeStr = ""
afterStr = ""
before = time.Time{}
after = time.Time{}
}
4 changes: 3 additions & 1 deletion cmd/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ var receiveCmd = &cobra.Command{
Long: `receive will restore a snapshot of a ZFS volume similar to how the "zfs recv" command works.`,
PreRunE: validateReceiveFlags,
RunE: func(cmd *cobra.Command, args []string) error {
helpers.AppLogger.Infof("Limiting the number of active files to %d", jobInfo.MaxFileBuffer)

if jobInfo.AutoRestore {
return backup.AutoRestore(context.Background(), &jobInfo)
}
Expand All @@ -64,7 +66,7 @@ func init() {
receiveCmd.Flags().StringVar(&jobInfo.Separator, "separator", "|", "the separator to use between object component names (used only for the initial manifest we are looking for).")
}

// Exists solely for integration testing
// ResetReceiveJobInfo exists solely for integration testing
func ResetReceiveJobInfo() {
resetRootFlags()
jobInfo.AutoRestore = false
Expand Down
2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func init() {
RootCmd.PersistentFlags().StringVar(&jobInfo.EncryptTo, "encryptTo", "", "the email of the user to encrypt the data to from the provided public keyring.")
RootCmd.PersistentFlags().StringVar(&jobInfo.SignFrom, "signFrom", "", "the email of the user to sign on behalf of from the provided private keyring.")
RootCmd.PersistentFlags().StringVar(&helpers.ZFSPath, "zfsPath", "zfs", "the path to the zfs executable.")
RootCmd.PersistentFlags().BoolVar(&helpers.JSONOutput, "jsonOutput", false, "dump results as a JSON string - on success only")
passphrase = []byte(os.Getenv("PGP_PASSPHRASE"))
}

Expand All @@ -96,6 +97,7 @@ func resetRootFlags() {
jobInfo.EncryptTo = ""
jobInfo.SignFrom = ""
helpers.ZFSPath = "zfs"
helpers.JSONOutput = false
}

func processFlags(cmd *cobra.Command, args []string) error {
Expand Down
Loading

0 comments on commit b7d04a4

Please sign in to comment.