Skip to content

Commit

Permalink
Merge pull request #1 from mazay/devel
Browse files Browse the repository at this point in the history
Memory usage optimisation + asynchronous processing
  • Loading branch information
mazay authored Sep 3, 2019
2 parents 0777b40 + df61e98 commit e114d14
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 90 deletions.
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@

## Description

The tool is aimed to sync data into S3 storage service for multiple _sites_ (path + bucket combination).
The `s3sync-service` tool is asynchronously syncing data to S3 storage service for multiple _sites_ (path + bucket combination).

On start, the `s3sync-service` compares local directory contents with S3 (using checksums/ETag) - copies new files and removes files deleted locally from S3 storage (if `retire_deleted` is set to `true`). Once the initial sync is over the `s3sync-service` start watching the specified local directories and subdirectories for changes in order to perform real-time sync to S3.
On start, the `s3sync-service` compares local directory contents with S3 (using checksums<->ETag and also validates StorageClass) - copies new files and removes files deleted locally from S3 storage (if `retire_deleted` is set to `true`). Once the initial sync is over the `s3sync-service` start watching the specified local directories and subdirectories for changes in order to perform real-time sync to S3.

## Command line arguments

```bash
> ./s3sync-service -h
Usage of ./s3sync-service:
-c string
Path to the config.yml (default "config.yml")
```

## Configuration

Expand All @@ -15,6 +24,7 @@ sites:
- local_path: /local/path1
bucket: backup-bucket-path1
bucket_region: us-east-1
storage_class: STANDARD_IA
access_key: AKIAI44QH8DHBEXAMPLE
secret_access_key: je7MtGbClwBF/2Zp9Utk/h3yCo8nvbEXAMPLEKEY
exclusions:
Expand All @@ -31,11 +41,11 @@ sites:
exclusions:
- "[Tt]humbs.db"
```
### Generic configuration options
| Variable | Description | Default | Required |
|----------|-------------|---------|----------|
| upload_timeout | Timeout for S3 upload | n/a | no |
| upload_queue_buffer | Number of elements in the upload queue waiting for processing, might improve performance, however, increases memory usage | `0` | no |
| upload_workers | Number of upload workers for the service | 10 | no |

Expand Down
27 changes: 12 additions & 15 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,28 @@ package main

import (
"os"
"time"

"gopkg.in/yaml.v2"
)

// Site is a option for backing up data to S3
type Site struct {
LocalPath string `yaml:"local_path"`
Bucket string `yaml:"bucket"`
BucketPath string `yaml:"bucket_path"`
BucketRegion string `yaml:"bucket_region"`
StorageClass string `yaml:"storage_class"`
AccessKey string `yaml:"access_key"`
SecretAccessKey string `yaml:"secret_access_key"`
RetireDeleted bool `yaml:"retire_deleted"`
Exclusions []string `yaml:",flow"`
UploadTimeout time.Duration `yaml:"upload_timeout"`
LocalPath string `yaml:"local_path"`
Bucket string `yaml:"bucket"`
BucketPath string `yaml:"bucket_path"`
BucketRegion string `yaml:"bucket_region"`
StorageClass string `yaml:"storage_class"`
AccessKey string `yaml:"access_key"`
SecretAccessKey string `yaml:"secret_access_key"`
RetireDeleted bool `yaml:"retire_deleted"`
Exclusions []string `yaml:",flow"`
}

// Config structure - contains lst of Site options
type Config struct {
UploadTimeout time.Duration `yaml:"upload_timeout"`
UploadQueueBuffer int `yaml:"upload_queue_buffer"`
UploadWorkers int `yaml:"upload_workers"`
Sites []Site `yaml:",flow"`
UploadQueueBuffer int `yaml:"upload_queue_buffer"`
UploadWorkers int `yaml:"upload_workers"`
Sites []Site `yaml:",flow"`
}

func configProcessError(err error) {
Expand Down
102 changes: 71 additions & 31 deletions localfiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package main

import (
"crypto/md5"
"fmt"
"io/ioutil"
"encoding/hex"
"io"
"os"
"path/filepath"
"regexp"
"strconv"

"github.com/aws/aws-sdk-go/service/s3"
)
Expand Down Expand Up @@ -36,11 +37,9 @@ func checkIfExcluded(path string, exclusions []string) bool {
}

// FilePathWalkDir walks throught the directory and all subdirectories returning list of files for upload and list of files to be deleted from S3
func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, uploadCh chan<- UploadCFG) ([]string, error) {
func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, checksumCh chan<- ChecksumCFG) ([]string, error) {
var deleteKeys []string
var localS3Keys []string
var filescount int
ch := make(chan string)

err := filepath.Walk(site.LocalPath, func(path string, info os.FileInfo, err error) error {
if err != nil {
Expand All @@ -52,26 +51,15 @@ func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, up
if excluded {
logger.Printf("skipping without errors: %+v \n", path)
} else {
filescount++
s3Key := generateS3Key(site.BucketPath, site.LocalPath, path)
localS3Keys = append(localS3Keys, s3Key)
checksumRemote, _ := awsItems[s3Key]
go compareChecksum(path, checksumRemote, ch)
checksumCh <- ChecksumCFG{UploadCFG{s3Service, path, site}, path, checksumRemote}
}
}
return nil
})

// Wait for checksums to be compared
var filename string
for i := 0; i < filescount; i++ {
filename = <-ch
if len(filename) > 0 {
// Add file to the upload queue
uploadCh <- UploadCFG{s3Service, filename, site}
}
}

// Generate a list of deleted files
for key := range awsItems {
if !checkIfInList(key, localS3Keys) {
Expand All @@ -82,23 +70,75 @@ func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, up
return deleteKeys, err
}

func compareChecksum(filename string, checksumRemote string, ch chan<- string) {
func compareChecksum(filename string, checksumRemote string) string {
var sumOfSums []byte
var parts int
var finalSum []byte
chunkSize := int64(5 * 1024 * 1024)

if checksumRemote == "" {
ch <- filename
return
return filename
}

contents, err := ioutil.ReadFile(filename)
if err == nil {
sum := md5.Sum(contents)
sumString := fmt.Sprintf("%x", sum)
// checksums don't match, mark for upload
if sumString != checksumRemote {
ch <- filename
return
file, err := os.Open(filename)
if err != nil {
logger.Fatal(err)
return ""
}
defer file.Close()

dataSize, err := file.Seek(0, io.SeekEnd)
if err != nil {
logger.Fatal(err)
return ""
}

for start := int64(0); start < dataSize; start += chunkSize {
length := chunkSize
if start+chunkSize > dataSize {
length = dataSize - start
}
sum, err := chunkMd5Sum(file, start, length)
if err != nil {
logger.Fatal(err)
return ""
}
// Files matched
ch <- ""
return
sumOfSums = append(sumOfSums, sum...)
parts++
}

if parts == 1 {
finalSum = sumOfSums
} else {
h := md5.New()
_, err := h.Write(sumOfSums)
if err != nil {
logger.Fatal(err)
return ""
}
finalSum = h.Sum(nil)
}

sumHex := hex.EncodeToString(finalSum)

if parts > 1 {
sumHex += "-" + strconv.Itoa(parts)
}

if sumHex != checksumRemote {
// checksums don't match, mark for upload
return filename
}
// Files matched
return ""
}

func chunkMd5Sum(file io.ReadSeeker, start int64, length int64) ([]byte, error) {
file.Seek(start, io.SeekStart)
h := md5.New()
if _, err := io.CopyN(h, file, length); err != nil {
return nil, err
}

return h.Sum(nil), nil
}
35 changes: 30 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@ import (
var wg = sync.WaitGroup{}
var logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)

// UploadCFG describes structure of upload configuration for the uploadWorker
// UploadCFG - structure for the upload queue
type UploadCFG struct {
s3Service *s3.S3
file string
site Site
}

// ChecksumCFG - structure for the checksum comparison queue
type ChecksumCFG struct {
UploadCFG UploadCFG
filename string
checksumRemote string
}

func main() {
var config Config
var configpath string
Expand All @@ -40,18 +47,36 @@ func main() {
go uploadWorker(uploadCh)
}

// Init checksum checker workers
checksumCh := make(chan ChecksumCFG)
for x := 0; x < 20; x++ {
go checksumWorker(checksumCh, uploadCh)
}

// Start separate thread for each site
wg.Add(len(config.Sites))
for _, site := range config.Sites {
site.UploadTimeout = config.UploadTimeout
go syncSite(site, uploadCh)
// Set default value for StorageClass
if site.StorageClass == "" {
site.StorageClass = "STANDARD"
}
go syncSite(site, uploadCh, checksumCh)
}
wg.Wait()
}

func uploadWorker(uploadCh <-chan UploadCFG) {
for cfg := range uploadCh {
uploadConfig := cfg
uploadFile(uploadConfig.s3Service, uploadConfig.file, uploadConfig.site)
uploadFile(cfg.s3Service, cfg.file, cfg.site)
}
}

func checksumWorker(checksumCh <-chan ChecksumCFG, uploadCh chan<- UploadCFG) {
for cfg := range checksumCh {
filename := compareChecksum(cfg.filename, cfg.checksumRemote)
if len(filename) > 0 {
// Add file to the upload queue
uploadCh <- cfg.UploadCFG
}
}
}
Loading

0 comments on commit e114d14

Please sign in to comment.