Skip to content

Commit

Permalink
Fixes from testing Azure/B2 backends, added some logging.
Browse files Browse the repository at this point in the history
  • Loading branch information
someone1 committed Sep 8, 2017
1 parent 214c764 commit 5a7aad1
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 10 deletions.
7 changes: 7 additions & 0 deletions backends/azure_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"crypto/md5"
"encoding/base64"
"encoding/binary"
"encoding/hex"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -196,6 +197,12 @@ func (a *AzureBackend) Upload(ctx context.Context, vol *helpers.VolumeInfo) erro
return err
}

md5Raw, merr := hex.DecodeString(vol.MD5Sum)
if merr != nil {
return merr
}
blob.Properties.ContentMD5 = base64.StdEncoding.EncodeToString(md5Raw)

// Finally, finalize the storage blob by giving Azure the block list order
err = blob.PutBlockList(blocks, nil)
if err != nil {
Expand Down
17 changes: 15 additions & 2 deletions backends/backblaze_b2_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"os"
"strings"
"sync"

"github.com/kurin/blazer/b2"

Expand All @@ -39,6 +40,7 @@ const B2BackendPrefix = "b2"
type B2Backend struct {
conf *BackendConfig
bucketCli *b2.Bucket
mutex sync.Mutex
prefix string
bucketName string
}
Expand Down Expand Up @@ -76,7 +78,12 @@ func (b *B2Backend) Init(ctx context.Context, conf *BackendConfig, opts ...Optio
opt.Apply(b)
}

client, err := b2.NewClient(ctx, accountID, accountKey, b2.Transport(bufferedRT{b.conf.MaxParallelUploadBuffer}))
var cliopts []b2.ClientOption
if conf.MaxParallelUploadBuffer != nil {
cliopts = append(cliopts, b2.Transport(bufferedRT{b.conf.MaxParallelUploadBuffer}))
}

client, err := b2.NewClient(ctx, accountID, accountKey, cliopts...)
if err != nil {
return err
}
Expand All @@ -87,16 +94,22 @@ func (b *B2Backend) Init(ctx context.Context, conf *BackendConfig, opts ...Optio
}

_, _, err = b.bucketCli.ListCurrentObjects(ctx, 0, nil)
if err == io.EOF {
err = nil
}
return err
}

// Upload will upload the provided volume to this B2Backend's configured bucket+prefix
func (b *B2Backend) Upload(ctx context.Context, vol *helpers.VolumeInfo) error {
// We will be doing multipart uploads, no need to allow multiple calls of Upload to initiate new uploads.
b.mutex.Lock()
defer b.mutex.Unlock()

name := b.prefix + vol.ObjectName
w := b.bucketCli.Object(name).NewWriter(ctx)

w.ConcurrentUploads = b.conf.MaxParallelUploads
w.Resume = true
w.ChunkSize = b.conf.UploadChunkSize

if _, err := io.Copy(w, vol); err != nil {
Expand Down
7 changes: 3 additions & 4 deletions backends/gcs_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,11 @@ func (g *GoogleCloudStorageBackend) Upload(ctx context.Context, vol *helpers.Vol

objName := g.prefix + vol.ObjectName
w := g.client.NewWriter(ctx, g.bucketName, objName, vol.CRC32CSum32, g.conf.UploadChunkSize)
defer w.Close()
_, err := io.Copy(w, vol)
if err != nil {
if _, err := io.Copy(w, vol); err != nil {
w.Close()
helpers.AppLogger.Debugf("gs backend: Error while uploading volume %s - %v", vol.ObjectName, err)
}
return err
return w.Close()

}

Expand Down
12 changes: 8 additions & 4 deletions backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ func retryUploadChainer(ctx context.Context, in <-chan *helpers.VolumeInfo, b ba
be.MaxElapsedTime = j.MaxRetryTime
retryconf := backoff.WithContext(be, ctx)

operation := volUploadWrapper(ctx, b, vol)
operation := volUploadWrapper(ctx, b, vol, prefix)
if err := backoff.Retry(operation, retryconf); err != nil {
helpers.AppLogger.Errorf("%s backend: Failed to upload volume %s due to error: %v", prefix, vol.ObjectName, err)
return err
Expand All @@ -642,14 +642,18 @@ func retryUploadChainer(ctx context.Context, in <-chan *helpers.VolumeInfo, b ba
return out, gwg
}

func volUploadWrapper(ctx context.Context, b backends.Backend, vol *helpers.VolumeInfo) func() error {
func volUploadWrapper(ctx context.Context, b backends.Backend, vol *helpers.VolumeInfo, prefix string) func() error {
return func() error {
if err := vol.OpenVolume(); err != nil {
helpers.AppLogger.Warningf("Error while opening volume %s - %v", vol.ObjectName, err)
helpers.AppLogger.Warningf("%s: Error while opening volume %s - %v", prefix, vol.ObjectName, err)
return err
}
defer vol.Close()

return b.Upload(ctx, vol)
err := b.Upload(ctx, vol)
if err != nil {
helpers.AppLogger.Warningf("%s: Error while uploading volume %s - %v", prefix, vol.ObjectName, err)
}
return err
}
}

0 comments on commit 5a7aad1

Please sign in to comment.