From 497943d56d421c6e4984461add438a12a22936c1 Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Mon, 2 Sep 2019 22:56:15 +0200 Subject: [PATCH 1/8] Introduce a separate queue for checksum comparison in order to improve async processing and reduce memory usage --- localfiles.go | 29 +++++++---------------------- main.go | 30 ++++++++++++++++++++++++++---- s3sync.go | 4 ++-- 3 files changed, 35 insertions(+), 28 deletions(-) diff --git a/localfiles.go b/localfiles.go index 262fb050..2e5d17fb 100644 --- a/localfiles.go +++ b/localfiles.go @@ -36,11 +36,9 @@ func checkIfExcluded(path string, exclusions []string) bool { } // FilePathWalkDir walks throught the directory and all subdirectories returning list of files for upload and list of files to be deleted from S3 -func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, uploadCh chan<- UploadCFG) ([]string, error) { +func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, checksumCh chan<- ChecksumCFG) ([]string, error) { var deleteKeys []string var localS3Keys []string - var filescount int - ch := make(chan string) err := filepath.Walk(site.LocalPath, func(path string, info os.FileInfo, err error) error { if err != nil { @@ -52,26 +50,15 @@ func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, up if excluded { logger.Printf("skipping without errors: %+v \n", path) } else { - filescount++ s3Key := generateS3Key(site.BucketPath, site.LocalPath, path) localS3Keys = append(localS3Keys, s3Key) checksumRemote, _ := awsItems[s3Key] - go compareChecksum(path, checksumRemote, ch) + checksumCh <- ChecksumCFG{UploadCFG{s3Service, path, site}, path, checksumRemote} } } return nil }) - // Wait for checksums to be compared - var filename string - for i := 0; i < filescount; i++ { - filename = <-ch - if len(filename) > 0 { - // Add file to the upload queue - uploadCh <- UploadCFG{s3Service, filename, site} - } - } - // Generate a list of deleted files for key := range awsItems { if !checkIfInList(key, localS3Keys) { @@ -82,10 +69,9 @@ func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, up return deleteKeys, err } -func compareChecksum(filename string, checksumRemote string, ch chan<- string) { +func compareChecksum(filename string, checksumRemote string) string { if checksumRemote == "" { - ch <- filename - return + return filename } contents, err := ioutil.ReadFile(filename) @@ -94,11 +80,10 @@ func compareChecksum(filename string, checksumRemote string, ch chan<- string) { sumString := fmt.Sprintf("%x", sum) // checksums don't match, mark for upload if sumString != checksumRemote { - ch <- filename - return + return filename } // Files matched - ch <- "" - return + return "" } + return filename } diff --git a/main.go b/main.go index 2825b172..36e43bb5 100644 --- a/main.go +++ b/main.go @@ -12,13 +12,20 @@ import ( var wg = sync.WaitGroup{} var logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile) -// UploadCFG describes structure of upload configuration for the uploadWorker +// UploadCFG - structure for the upload queue type UploadCFG struct { s3Service *s3.S3 file string site Site } +// ChecksumCFG - structure for the checksum comparison queue +type ChecksumCFG struct { + UploadCFG UploadCFG + filename string + checksumRemote string +} + func main() { var config Config var configpath string @@ -40,18 +47,33 @@ func main() { go uploadWorker(uploadCh) } + // Init checksum checker workers + checksumCh := make(chan ChecksumCFG) + for x := 0; x < 20; x++ { + go checksumWorker(checksumCh, uploadCh) + } + // Start separate thread for each site wg.Add(len(config.Sites)) for _, site := range config.Sites { site.UploadTimeout = config.UploadTimeout - go syncSite(site, uploadCh) + go syncSite(site, uploadCh, checksumCh) } wg.Wait() } func uploadWorker(uploadCh <-chan UploadCFG) { for cfg := range uploadCh { - uploadConfig := cfg - uploadFile(uploadConfig.s3Service, uploadConfig.file, uploadConfig.site) + uploadFile(cfg.s3Service, cfg.file, cfg.site) + } +} + +func checksumWorker(checksumCh <-chan ChecksumCFG, uploadCh chan<- UploadCFG) { + for cfg := range checksumCh { + filename := compareChecksum(cfg.filename, cfg.checksumRemote) + if len(filename) > 0 { + // Add file to the upload queue + uploadCh <- UploadCFG{cfg.UploadCFG.s3Service, cfg.UploadCFG.file, cfg.UploadCFG.site} + } } } diff --git a/s3sync.go b/s3sync.go index 40c3d102..2fe8951b 100644 --- a/s3sync.go +++ b/s3sync.go @@ -125,11 +125,11 @@ func deleteFile(s3Service *s3.S3, bucketName string, s3Key string) { logger.Printf("removed s3 object: %s/%s\n", bucketName, s3Key) } -func syncSite(site Site, uploadCh chan<- UploadCFG) { +func syncSite(site Site, uploadCh chan<- UploadCFG, checksumCh chan<- ChecksumCFG) { s3Service := s3.New(getS3Session(site)) awsItems, err := getAwsS3ItemMap(s3Service, site.Bucket) - deleteKeys, err := FilePathWalkDir(site, awsItems, s3Service, uploadCh) + deleteKeys, err := FilePathWalkDir(site, awsItems, s3Service, checksumCh) if err != nil { logger.Fatal(err) From 1ee69e149cf8a56cf8571fdd7e38b23a05c17578 Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Tue, 3 Sep 2019 00:33:50 +0200 Subject: [PATCH 2/8] Indicate the FS watcher startup in the log --- watcher.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/watcher.go b/watcher.go index 52492a79..964d84ea 100644 --- a/watcher.go +++ b/watcher.go @@ -10,6 +10,8 @@ import ( ) func watch(s3Service *s3.S3, site Site, uploadCh chan<- UploadCFG) { + logger.Printf("starting the FS watcher for site %s\n", site.Bucket) + w := watcher.New() w.FilterOps(watcher.Create, watcher.Write, watcher.Remove, watcher.Rename, watcher.Move) From 26cbc0f669fcaf424b3b3a0d2b8d8db27d1f203b Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Tue, 3 Sep 2019 00:50:24 +0200 Subject: [PATCH 3/8] Minor syntax fix --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 36e43bb5..afeca38f 100644 --- a/main.go +++ b/main.go @@ -73,7 +73,7 @@ func checksumWorker(checksumCh <-chan ChecksumCFG, uploadCh chan<- UploadCFG) { filename := compareChecksum(cfg.filename, cfg.checksumRemote) if len(filename) > 0 { // Add file to the upload queue - uploadCh <- UploadCFG{cfg.UploadCFG.s3Service, cfg.UploadCFG.file, cfg.UploadCFG.site} + uploadCh <- cfg.UploadCFG } } } From 4b517df8387168a5315a04342b92c5cf3249c205 Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Tue, 3 Sep 2019 19:20:17 +0200 Subject: [PATCH 4/8] Memory usage optimisation by improving checksum validation and upload procedures --- config.go | 27 ++++++++---------- localfiles.go | 79 +++++++++++++++++++++++++++++++++++++++++++-------- main.go | 5 +++- s3sync.go | 55 +++++++++++++---------------------- 4 files changed, 102 insertions(+), 64 deletions(-) diff --git a/config.go b/config.go index 866fb9a9..3f9002b0 100644 --- a/config.go +++ b/config.go @@ -2,31 +2,28 @@ package main import ( "os" - "time" "gopkg.in/yaml.v2" ) // Site is a option for backing up data to S3 type Site struct { - LocalPath string `yaml:"local_path"` - Bucket string `yaml:"bucket"` - BucketPath string `yaml:"bucket_path"` - BucketRegion string `yaml:"bucket_region"` - StorageClass string `yaml:"storage_class"` - AccessKey string `yaml:"access_key"` - SecretAccessKey string `yaml:"secret_access_key"` - RetireDeleted bool `yaml:"retire_deleted"` - Exclusions []string `yaml:",flow"` - UploadTimeout time.Duration `yaml:"upload_timeout"` + LocalPath string `yaml:"local_path"` + Bucket string `yaml:"bucket"` + BucketPath string `yaml:"bucket_path"` + BucketRegion string `yaml:"bucket_region"` + StorageClass string `yaml:"storage_class"` + AccessKey string `yaml:"access_key"` + SecretAccessKey string `yaml:"secret_access_key"` + RetireDeleted bool `yaml:"retire_deleted"` + Exclusions []string `yaml:",flow"` } // Config structure - contains lst of Site options type Config struct { - UploadTimeout time.Duration `yaml:"upload_timeout"` - UploadQueueBuffer int `yaml:"upload_queue_buffer"` - UploadWorkers int `yaml:"upload_workers"` - Sites []Site `yaml:",flow"` + UploadQueueBuffer int `yaml:"upload_queue_buffer"` + UploadWorkers int `yaml:"upload_workers"` + Sites []Site `yaml:",flow"` } func configProcessError(err error) { diff --git a/localfiles.go b/localfiles.go index 2e5d17fb..6e807208 100644 --- a/localfiles.go +++ b/localfiles.go @@ -2,11 +2,12 @@ package main import ( "crypto/md5" - "fmt" - "io/ioutil" + "encoding/hex" + "io" "os" "path/filepath" "regexp" + "strconv" "github.com/aws/aws-sdk-go/service/s3" ) @@ -70,20 +71,74 @@ func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, ch } func compareChecksum(filename string, checksumRemote string) string { + var sumOfSums []byte + var parts int + var finalSum []byte + chunkSize := int64(5 * 1024 * 1024) + if checksumRemote == "" { return filename } - contents, err := ioutil.ReadFile(filename) - if err == nil { - sum := md5.Sum(contents) - sumString := fmt.Sprintf("%x", sum) - // checksums don't match, mark for upload - if sumString != checksumRemote { - return filename - } - // Files matched + file, err := os.Open(filename) + if err != nil { + logger.Fatal(err) return "" } - return filename + defer file.Close() + + dataSize, err := file.Seek(0, io.SeekEnd) + if err != nil { + logger.Fatal(err) + return "" + } + + for start := int64(0); start < dataSize; start += chunkSize { + length := chunkSize + if start+chunkSize > dataSize { + length = dataSize - start + } + sum, err := chunkMd5Sum(file, start, length) + if err != nil { + logger.Fatal(err) + return "" + } + sumOfSums = append(sumOfSums, sum...) + parts++ + } + + if parts == 1 { + finalSum = sumOfSums + } else { + h := md5.New() + _, err := h.Write(sumOfSums) + if err != nil { + logger.Fatal(err) + return "" + } + finalSum = h.Sum(nil) + } + + sumHex := hex.EncodeToString(finalSum) + + if parts > 1 { + sumHex += "-" + strconv.Itoa(parts) + } + + if sumHex != checksumRemote { + // checksums don't match, mark for upload + return filename + } + // Files matched + return "" +} + +func chunkMd5Sum(file io.ReadSeeker, start int64, length int64) ([]byte, error) { + file.Seek(start, io.SeekStart) + h := md5.New() + if _, err := io.CopyN(h, file, length); err != nil { + return nil, err + } + + return h.Sum(nil), nil } diff --git a/main.go b/main.go index afeca38f..68188d0f 100644 --- a/main.go +++ b/main.go @@ -56,7 +56,10 @@ func main() { // Start separate thread for each site wg.Add(len(config.Sites)) for _, site := range config.Sites { - site.UploadTimeout = config.UploadTimeout + // Set default value for StorageClass + if site.StorageClass == "" { + site.StorageClass = "STANDARD" + } go syncSite(site, uploadCh, checksumCh) } wg.Wait() diff --git a/s3sync.go b/s3sync.go index 2fe8951b..7edbf857 100644 --- a/s3sync.go +++ b/s3sync.go @@ -1,18 +1,16 @@ package main import ( - "context" "os" "path/filepath" - "strings" "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" + "github.com/aws/aws-sdk-go/service/s3/s3manager" ) func generateS3Key(bucketPath string, root string, path string) string { @@ -37,17 +35,20 @@ func getS3Service(site Site) *s3.S3 { return s3.New(getS3Session(site)) } -func getAwsS3ItemMap(s3Service *s3.S3, bucketName string) (map[string]string, error) { - var loi s3.ListObjectsInput +func getAwsS3ItemMap(s3Service *s3.S3, site Site) (map[string]string, error) { var items = make(map[string]string) - loi.SetBucket(bucketName) - obj, err := s3Service.ListObjects(&loi) + obj, err := s3Service.ListObjects(&s3.ListObjectsInput{Bucket: aws.String(site.Bucket)}) if err == nil { for _, s3obj := range obj.Contents { - eTag := strings.Trim(*(s3obj.ETag), "\"") - items[*(s3obj.Key)] = eTag + if aws.StringValue(s3obj.StorageClass) != site.StorageClass { + logger.Printf("storage class does not match, marking for re-upload: %s", aws.StringValue(s3obj.Key)) + items[aws.StringValue(s3obj.Key)] = "none" + } else { + eTag := aws.StringValue(s3obj.ETag) + items[aws.StringValue(s3obj.Key)] = eTag + } } return items, nil } @@ -56,32 +57,18 @@ func getAwsS3ItemMap(s3Service *s3.S3, bucketName string) (map[string]string, er } func uploadFile(s3Service *s3.S3, file string, site Site) { - var cancelFn func() - - ctx := context.Background() - - if site.UploadTimeout > 0 { - ctx, cancelFn = context.WithTimeout(ctx, site.UploadTimeout) - } - - if cancelFn != nil { - defer cancelFn() - } - - // Set default value for StorageClass, available values are here - // https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-class-intro.html#sc-compare - if site.StorageClass == "" { - site.StorageClass = "STANDARD" - } - s3Key := generateS3Key(site.BucketPath, site.LocalPath, file) + uploader := s3manager.NewUploader(getS3Session(site), func(u *s3manager.Uploader) { + u.PartSize = 5 * 1024 * 1024 + u.Concurrency = 5 + }) f, fileErr := os.Open(file) if fileErr != nil { logger.Printf("failed to open file %q, %v", file, fileErr) } else { - _, err := s3Service.PutObjectWithContext(ctx, &s3.PutObjectInput{ + result, err := uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(site.Bucket), Key: aws.String(s3Key), Body: f, @@ -89,16 +76,12 @@ func uploadFile(s3Service *s3.S3, file string, site Site) { }) if err != nil { - if aerr, ok := err.(awserr.Error); ok && aerr.Code() == request.CanceledErrorCode { - logger.Printf("upload canceled due to timeout, %v\n", err) - } else { - logger.Printf("failed to upload object, %v\n", err) - } - os.Exit(1) + logger.Printf("failed to upload object, %v\n", err) } - logger.Printf("successfully uploaded file: %s/%s\n", site.Bucket, s3Key) + logger.Printf("successfully uploaded file: %s\n", result.Location) } + defer f.Close() } func deleteFile(s3Service *s3.S3, bucketName string, s3Key string) { @@ -128,7 +111,7 @@ func deleteFile(s3Service *s3.S3, bucketName string, s3Key string) { func syncSite(site Site, uploadCh chan<- UploadCFG, checksumCh chan<- ChecksumCFG) { s3Service := s3.New(getS3Session(site)) - awsItems, err := getAwsS3ItemMap(s3Service, site.Bucket) + awsItems, err := getAwsS3ItemMap(s3Service, site) deleteKeys, err := FilePathWalkDir(site, awsItems, s3Service, checksumCh) if err != nil { From e060ef520201a4311f8da2e14c47f04976c3140b Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Tue, 3 Sep 2019 19:23:23 +0200 Subject: [PATCH 5/8] Updated readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 2d2c6cf1..5ed0d6ab 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,7 @@ sites: - local_path: /local/path1 bucket: backup-bucket-path1 bucket_region: us-east-1 + storage_class: STANDARD_IA access_key: AKIAI44QH8DHBEXAMPLE secret_access_key: je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY exclusions: @@ -35,7 +36,6 @@ sites: | Variable | Description | Default | Required | |----------|-------------|---------|----------| -| upload_timeout | Timeout for S3 upload | n/a | no | | upload_queue_buffer | Number of elements in the upload queue waiting for processing, might improve performance, however, increases memory usage | `0` | no | | upload_workers | Number of upload workers for the service | 10 | no | From 79d8a29cf6598fdafe59730f4e03fd64d47162c7 Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Tue, 3 Sep 2019 19:33:54 +0200 Subject: [PATCH 6/8] Properly fetch ETag --- s3sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/s3sync.go b/s3sync.go index 7edbf857..ccf05a79 100644 --- a/s3sync.go +++ b/s3sync.go @@ -3,6 +3,7 @@ package main import ( "os" "path/filepath" + "strings" "time" "github.com/aws/aws-sdk-go/aws" @@ -46,8 +47,7 @@ func getAwsS3ItemMap(s3Service *s3.S3, site Site) (map[string]string, error) { logger.Printf("storage class does not match, marking for re-upload: %s", aws.StringValue(s3obj.Key)) items[aws.StringValue(s3obj.Key)] = "none" } else { - eTag := aws.StringValue(s3obj.ETag) - items[aws.StringValue(s3obj.Key)] = eTag + items[aws.StringValue(s3obj.Key)] = strings.Trim(*(s3obj.ETag), "\"") } } return items, nil From 3ad705983cac1889886b033b14065097dcd8b494 Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Tue, 3 Sep 2019 20:00:05 +0200 Subject: [PATCH 7/8] Dont use UploadOutput --- s3sync.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/s3sync.go b/s3sync.go index ccf05a79..391c9a52 100644 --- a/s3sync.go +++ b/s3sync.go @@ -68,7 +68,7 @@ func uploadFile(s3Service *s3.S3, file string, site Site) { if fileErr != nil { logger.Printf("failed to open file %q, %v", file, fileErr) } else { - result, err := uploader.Upload(&s3manager.UploadInput{ + _, err := uploader.Upload(&s3manager.UploadInput{ Bucket: aws.String(site.Bucket), Key: aws.String(s3Key), Body: f, @@ -79,7 +79,7 @@ func uploadFile(s3Service *s3.S3, file string, site Site) { logger.Printf("failed to upload object, %v\n", err) } - logger.Printf("successfully uploaded file: %s\n", result.Location) + logger.Printf("successfully uploaded file: %s/%s\n", site.Bucket, s3Key) } defer f.Close() } From df61e988c44fd0116f836551af7c39ad75bce0af Mon Sep 17 00:00:00 2001 From: Yevgeniy Valeyev Date: Tue, 3 Sep 2019 20:00:15 +0200 Subject: [PATCH 8/8] Updated readme --- README.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5ed0d6ab..3e642776 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,18 @@ ## Description -The tool is aimed to sync data into S3 storage service for multiple _sites_ (path + bucket combination). +The `s3sync-service` tool is asynchronously syncing data to S3 storage service for multiple _sites_ (path + bucket combination). -On start, the `s3sync-service` compares local directory contents with S3 (using checksums/ETag) - copies new files and removes files deleted locally from S3 storage (if `retire_deleted` is set to `true`). Once the initial sync is over the `s3sync-service` start watching the specified local directories and subdirectories for changes in order to perform real-time sync to S3. +On start, the `s3sync-service` compares local directory contents with S3 (using checksums<->ETag and also validates StorageClass) - copies new files and removes files deleted locally from S3 storage (if `retire_deleted` is set to `true`). Once the initial sync is over the `s3sync-service` start watching the specified local directories and subdirectories for changes in order to perform real-time sync to S3. + +## Command line arguments + +```bash +> ./s3sync-service -h +Usage of ./s3sync-service: + -c string + Path to the config.yml (default "config.yml") +``` ## Configuration @@ -32,6 +41,7 @@ sites: exclusions: - "[Tt]humbs.db" ``` + ### Generic configuration options | Variable | Description | Default | Required |