Skip to content

Commit

Permalink
Merge pull request #2 from mazay/devel
Browse files Browse the repository at this point in the history
JSON logging format
  • Loading branch information
mazay authored Sep 6, 2019
2 parents e114d14 + 0112329 commit 2e9025f
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 26 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ RUN go get -d -v ./...
RUN go install -v ./...
RUN go build

RUN rm -rf /go/src/github.com /go/src/gopkg.in
RUN rm -rf /go/src/github.com /go/src/gopkg.in /go/src/golang.org
RUN rm *.go

CMD ["./s3sync-service"]
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ sites:
| Variable | Description | Default | Required |
|----------|-------------|---------|----------|
| loglevel | Logging level, valid options are - `trace`, `debug`, `info`, `warn`, `error`, `fatal`, `panic`. With log level set to `trace` logger will output everything, with `debug` everything apart from `trace` and so on. | `info` | no |
| upload_queue_buffer | Number of elements in the upload queue waiting for processing, might improve performance, however, increases memory usage | `0` | no |
| upload_workers | Number of upload workers for the service | 10 | no |

Expand Down
1 change: 1 addition & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Site struct {

// Config structure - contains lst of Site options
type Config struct {
LogLevel string `yaml:"loglevel"`
UploadQueueBuffer int `yaml:"upload_queue_buffer"`
UploadWorkers int `yaml:"upload_workers"`
Sites []Site `yaml:",flow"`
Expand Down
16 changes: 9 additions & 7 deletions localfiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func FilePathWalkDir(site Site, awsItems map[string]string, s3Service *s3.S3, ch
if !info.IsDir() {
excluded := checkIfExcluded(path, site.Exclusions)
if excluded {
logger.Printf("skipping without errors: %+v \n", path)
logger.Debug("skipping without errors: %+v", path)
} else {
s3Key := generateS3Key(site.BucketPath, site.LocalPath, path)
localS3Keys = append(localS3Keys, s3Key)
Expand All @@ -76,20 +76,22 @@ func compareChecksum(filename string, checksumRemote string) string {
var finalSum []byte
chunkSize := int64(5 * 1024 * 1024)

logger.Debugf("%s: comparing checksums", filename)

if checksumRemote == "" {
return filename
}

file, err := os.Open(filename)
if err != nil {
logger.Fatal(err)
logger.Error(err)
return ""
}
defer file.Close()

dataSize, err := file.Seek(0, io.SeekEnd)
if err != nil {
logger.Fatal(err)
logger.Error(err)
return ""
}

Expand All @@ -100,7 +102,7 @@ func compareChecksum(filename string, checksumRemote string) string {
}
sum, err := chunkMd5Sum(file, start, length)
if err != nil {
logger.Fatal(err)
logger.Error(err)
return ""
}
sumOfSums = append(sumOfSums, sum...)
Expand All @@ -113,7 +115,7 @@ func compareChecksum(filename string, checksumRemote string) string {
h := md5.New()
_, err := h.Write(sumOfSums)
if err != nil {
logger.Fatal(err)
logger.Error(err)
return ""
}
finalSum = h.Sum(nil)
Expand All @@ -126,10 +128,10 @@ func compareChecksum(filename string, checksumRemote string) string {
}

if sumHex != checksumRemote {
// checksums don't match, mark for upload
logger.Debugf("%s: checksums do not match, local checksum is %s, remote - %s", filename, sumHex, checksumRemote)
return filename
}
// Files matched
logger.Debugf("%s: checksums matched", filename)
return ""
}

Expand Down
35 changes: 35 additions & 0 deletions logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package main

import (
"os"

"github.com/sirupsen/logrus"
)

var logger = logrus.New()

func initLogger(config Config) {
logLevels := map[string]logrus.Level{
"trace": logrus.TraceLevel,
"debug": logrus.DebugLevel,
"info": logrus.InfoLevel,
"warn": logrus.WarnLevel,
"error": logrus.ErrorLevel,
"fatal": logrus.FatalLevel,
"panic": logrus.PanicLevel,
}

// set default loglevel
if config.LogLevel == "" {
config.LogLevel = "info"
}

logger.SetFormatter(&logrus.JSONFormatter{})
logger.SetOutput(os.Stdout)

logger.SetLevel(logLevels[config.LogLevel])

if config.LogLevel == "trace" || config.LogLevel == "debug" {
logger.SetReportCaller(true)
}
}
6 changes: 3 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@ package main

import (
"flag"
"log"
"os"
"sync"

"github.com/aws/aws-sdk-go/service/s3"
)

var wg = sync.WaitGroup{}
var logger = log.New(os.Stdout, "", log.Ldate|log.Ltime|log.Lshortfile)

// UploadCFG - structure for the upload queue
type UploadCFG struct {
Expand All @@ -37,6 +34,9 @@ func main() {
// Read config file
readFile(&config, configpath)

// init logger
initLogger(config)

// Init upload worker
if config.UploadWorkers == 0 {
config.UploadWorkers = 10
Expand Down
16 changes: 8 additions & 8 deletions s3sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func getAwsS3ItemMap(s3Service *s3.S3, site Site) (map[string]string, error) {
if err == nil {
for _, s3obj := range obj.Contents {
if aws.StringValue(s3obj.StorageClass) != site.StorageClass {
logger.Printf("storage class does not match, marking for re-upload: %s", aws.StringValue(s3obj.Key))
logger.Info("storage class does not match, marking for re-upload: %s", aws.StringValue(s3obj.Key))
items[aws.StringValue(s3obj.Key)] = "none"
} else {
items[aws.StringValue(s3obj.Key)] = strings.Trim(*(s3obj.ETag), "\"")
Expand All @@ -66,7 +66,7 @@ func uploadFile(s3Service *s3.S3, file string, site Site) {
f, fileErr := os.Open(file)

if fileErr != nil {
logger.Printf("failed to open file %q, %v", file, fileErr)
logger.Errorf("failed to open file %q, %v", file, fileErr)
} else {
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(site.Bucket),
Expand All @@ -76,10 +76,10 @@ func uploadFile(s3Service *s3.S3, file string, site Site) {
})

if err != nil {
logger.Printf("failed to upload object, %v\n", err)
logger.Errorf("failed to upload object, %v", err)
}

logger.Printf("successfully uploaded file: %s/%s\n", site.Bucket, s3Key)
logger.Infof("successfully uploaded file: %s/%s", site.Bucket, s3Key)
}
defer f.Close()
}
Expand All @@ -95,17 +95,17 @@ func deleteFile(s3Service *s3.S3, bucketName string, s3Key string) {
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
default:
logger.Println(aerr.Error())
logger.Errorln(aerr.Error())
}
} else {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
logger.Println(err.Error())
logger.Errorln(err.Error())
}
return
}

logger.Printf("removed s3 object: %s/%s\n", bucketName, s3Key)
logger.Infof("removed s3 object: %s/%s", bucketName, s3Key)
}

func syncSite(site Site, uploadCh chan<- UploadCFG, checksumCh chan<- ChecksumCFG) {
Expand All @@ -115,7 +115,7 @@ func syncSite(site Site, uploadCh chan<- UploadCFG, checksumCh chan<- ChecksumCF
deleteKeys, err := FilePathWalkDir(site, awsItems, s3Service, checksumCh)

if err != nil {
logger.Fatal(err)
logger.Errorln(err)
}

// Delete retired files
Expand Down
14 changes: 7 additions & 7 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func watch(s3Service *s3.S3, site Site, uploadCh chan<- UploadCFG) {
logger.Printf("starting the FS watcher for site %s\n", site.Bucket)
logger.Printf("starting the FS watcher for site %s", site.Bucket)

w := watcher.New()
w.FilterOps(watcher.Create, watcher.Write, watcher.Remove, watcher.Rename, watcher.Move)
Expand All @@ -20,7 +20,7 @@ func watch(s3Service *s3.S3, site Site, uploadCh chan<- UploadCFG) {
select {
case event := <-w.Event:
if !event.IsDir() {
logger.Println(event)
logger.Infoln(event)
// Convert filepath to string
filepath := fmt.Sprint(event.Path)
if fmt.Sprint(event.Op) == "REMOVE" {
Expand All @@ -38,27 +38,27 @@ func watch(s3Service *s3.S3, site Site, uploadCh chan<- UploadCFG) {
} else {
excluded := checkIfExcluded(filepath, site.Exclusions)
if excluded {
logger.Printf("skipping without errors: %+v \n", filepath)
logger.Debugf("skipping without errors: %+v", filepath)
} else {
fileWatcher(s3Service, site, uploadCh, event, filepath)
}
}
}
case err := <-w.Error:
logger.Fatal(err)
logger.Errorln(err)
case <-w.Closed:
return
}
}
}()

if err := w.AddRecursive(site.LocalPath); err != nil {
logger.Fatal(err)
logger.Errorln(err)
}

// Start the watching process - it'll check for changes every 100ms.
if err := w.Start(time.Millisecond * 100); err != nil {
logger.Fatal(err)
logger.Errorln(err)
}
}

Expand All @@ -71,7 +71,7 @@ func fileWatcher(s3Service *s3.S3, site Site, uploadCh chan<- UploadCFG, event w
file, _ := os.Stat(filepath)
mtime := file.ModTime()
if time.Now().Sub(mtime).Seconds() >= 30 {
logger.Printf("there were no writes to the file for 30 seconds, adding to the upload queue: %s", filepath)
logger.Debugf("there were no writes to the file for 30 seconds, adding to the upload queue: %s", filepath)
uploadCh <- UploadCFG{s3Service, filepath, site}
return
}
Expand Down

0 comments on commit 2e9025f

Please sign in to comment.