Skip to content

Commit

Permalink
Sending CSV to AWS (#96)
Browse files Browse the repository at this point in the history
Timeseries data is sent to AWS in CSV format
  • Loading branch information
local-minimum authored May 20, 2020
1 parent 7e976ea commit e39b988
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 42 deletions.
3 changes: 2 additions & 1 deletion cmd/rac/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/innosat-mats/rac-extract-payload/internal/awstools"
"github.com/innosat-mats/rac-extract-payload/internal/common"
"github.com/innosat-mats/rac-extract-payload/internal/exports"
"github.com/innosat-mats/rac-extract-payload/internal/extractors"
Expand Down Expand Up @@ -63,7 +64,7 @@ func getCallback(
return callback, teardown, nil
} else if toAws {
callback, teardown := exports.AWSS3CallbackFactory(
exports.AWSUpload,
awstools.AWSUpload,
project,
awsDescription,
!skipImages,
Expand Down
37 changes: 37 additions & 0 deletions internal/awstools/timeseries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package awstools

import (
"bytes"
"fmt"
"io"

"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

// Timeseries holds csv-buffer and upload features
type Timeseries struct {
reader io.Reader
writer io.Writer
upload AWSUploadFunc
uploader *s3manager.Uploader
key string
}

func (ts *Timeseries) Write(data []byte) (int, error) {
if ts.writer == nil {
return 0, fmt.Errorf("Timeseries %v already closed", ts.key)
}
return ts.writer.Write(data)
}

// Close uploads data to aws
func (ts *Timeseries) Close() {
ts.writer = nil
ts.upload(ts.uploader, ts.key, ts.reader)
}

// NewTimeseries returns a timeseries that will invoke upload upon close
func NewTimeseries(upload AWSUploadFunc, uploader *s3manager.Uploader, key string) *Timeseries {
buf := bytes.NewBuffer([]byte{})
return &Timeseries{buf, buf, upload, uploader, key}
}
66 changes: 66 additions & 0 deletions internal/awstools/timeseries_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package awstools

import (
"io"
"io/ioutil"
"testing"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

func TestNewTimeseries(t *testing.T) {
sess := session.Must(session.NewSession(&aws.Config{Region: aws.String("localhost")}))
upload := s3manager.NewUploader(sess)
var idxUp = 0
uploads := make(map[string]int)

var uploader = func(uploader *s3manager.Uploader, key string, bodyBuffer io.Reader) {
buf, _ := ioutil.ReadAll(bodyBuffer)
uploads[key] = len(buf)
idxUp++
}

ts := NewTimeseries(uploader, upload, "myfile")
// Write several times possible
for i := 0; i < 3; i++ {
n, err := ts.Write([]byte("test"))
if n != 4 || err != nil {
t.Errorf("Timeseries.Write() = %v %v, want 4 <nil>", n, err)
return
}
if idxUp != 0 {
t.Errorf("Timesereis.Write() initiated an unexpected upload %v", uploads)
return
}
}

// Uploads file upon closing
ts.Close()
if idxUp != 1 {
t.Errorf(
"Timeseries.Close() didn't upload one file (%v files sent) with content %v",
idxUp,
uploads,
)
}
l, ok := uploads["myfile"]
if !ok || l != 4*3 {
t.Errorf(
"Timeseries.Close() didn't upload 'myfile' with 12 bytes (%v, %v)",
l,
ok,
)
}

// Can't write after closing
n, err := ts.Write([]byte("hello"))
if n != 0 || err == nil {
t.Errorf(
"Timeseries.Write() should have written nothing and err after closing (%v, %v)",
n,
err,
)
}
}
28 changes: 28 additions & 0 deletions internal/awstools/uploader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package awstools

import (
"io"
"log"
"strings"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)

const awsBucket = "mats-l0-artifacts"

// AWSUpload uploads file content to target bucket
func AWSUpload(uploader *s3manager.Uploader, key string, body io.Reader) {
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(awsBucket),
Key: aws.String(strings.ReplaceAll(key, "\\", "/")),
Body: body,
})
if err != nil {
log.Printf("Failed to upload file %v, %v", key, err)
}

}

// AWSUploadFunc is the signature of an AWS upload function
type AWSUploadFunc func(uploader *s3manager.Uploader, key string, body io.Reader)
40 changes: 23 additions & 17 deletions internal/exports/aws_s3_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,42 +4,36 @@ import (
"bytes"
"fmt"
"image/png"
"io"
"log"
"os"
"path/filepath"
"strings"
"sync"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/innosat-mats/rac-extract-payload/internal/aez"
"github.com/innosat-mats/rac-extract-payload/internal/awstools"
"github.com/innosat-mats/rac-extract-payload/internal/common"
"github.com/innosat-mats/rac-extract-payload/internal/timeseries"
)

const awsBucket = "mats-l0-artifacts"
const awsS3Region = "eu-north-1"

// AWSUpload uploads file content to target bucket
func AWSUpload(uploader *s3manager.Uploader, key string, body io.Reader) {
_, err := uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(awsBucket),
Key: aws.String(strings.ReplaceAll(key, "\\", "/")),
Body: body,
})
if err != nil {
log.Printf("Failed to upload file %v, %v", key, err)
func csvAWSWriterFactoryCreator(
upload awstools.AWSUploadFunc,
uploader *s3manager.Uploader,
project string,
) timeseries.CSVFactory {
return func(pkg *common.DataRecord, stream timeseries.OutStream) (timeseries.CSVWriter, error) {
key := fmt.Sprintf("%v/%v.csv", project, stream.String())
return timeseries.NewCSV(awstools.NewTimeseries(upload, uploader, key), key), nil
}

}

// AWSUploadFunc is the signature of an AWS upload function
type AWSUploadFunc func(uploader *s3manager.Uploader, key string, body io.Reader)

// AWSS3CallbackFactory generates callbacks for writing to S3 instead of disk
func AWSS3CallbackFactory(
upload AWSUploadFunc,
upload awstools.AWSUploadFunc,
project string,
awsDescriptionPath string,
writeImages bool,
Expand All @@ -49,6 +43,9 @@ func AWSS3CallbackFactory(

sess := session.Must(session.NewSession(&aws.Config{Region: aws.String(awsS3Region)}))
uploader := s3manager.NewUploader(sess)
timeseriesCollection := timeseries.NewCollection(
csvAWSWriterFactoryCreator(upload, uploader, project),
)

if awsDescriptionPath != "" {
awsDescription, err := os.Open(awsDescriptionPath)
Expand Down Expand Up @@ -96,9 +93,18 @@ func AWSS3CallbackFactory(
}()
}
}

if writeTimeseries && pkg.Data != nil {
// Write to the dedicated target stream
err := timeseriesCollection.Write(&pkg)
if err != nil {
log.Println(err)
}
}
}

teardown := func() {
timeseriesCollection.CloseAll()
wg.Wait()
}

Expand Down
Loading

0 comments on commit e39b988

Please sign in to comment.