diff --git a/cmd/rac/cli.go b/cmd/rac/cli.go index 0f16550..44d5889 100644 --- a/cmd/rac/cli.go +++ b/cmd/rac/cli.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/innosat-mats/rac-extract-payload/internal/awstools" "github.com/innosat-mats/rac-extract-payload/internal/common" "github.com/innosat-mats/rac-extract-payload/internal/exports" "github.com/innosat-mats/rac-extract-payload/internal/extractors" @@ -63,7 +64,7 @@ func getCallback( return callback, teardown, nil } else if toAws { callback, teardown := exports.AWSS3CallbackFactory( - exports.AWSUpload, + awstools.AWSUpload, project, awsDescription, !skipImages, diff --git a/internal/awstools/timeseries.go b/internal/awstools/timeseries.go new file mode 100644 index 0000000..8d96478 --- /dev/null +++ b/internal/awstools/timeseries.go @@ -0,0 +1,37 @@ +package awstools + +import ( + "bytes" + "fmt" + "io" + + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +// Timeseries holds csv-buffer and upload features +type Timeseries struct { + reader io.Reader + writer io.Writer + upload AWSUploadFunc + uploader *s3manager.Uploader + key string +} + +func (ts *Timeseries) Write(data []byte) (int, error) { + if ts.writer == nil { + return 0, fmt.Errorf("Timeseries %v already closed", ts.key) + } + return ts.writer.Write(data) +} + +// Close uploads data to aws +func (ts *Timeseries) Close() { + ts.writer = nil + ts.upload(ts.uploader, ts.key, ts.reader) +} + +// NewTimeseries returns a timeseries that will invoke upload upon close +func NewTimeseries(upload AWSUploadFunc, uploader *s3manager.Uploader, key string) *Timeseries { + buf := bytes.NewBuffer([]byte{}) + return &Timeseries{buf, buf, upload, uploader, key} +} diff --git a/internal/awstools/timeseries_test.go b/internal/awstools/timeseries_test.go new file mode 100644 index 0000000..068b115 --- /dev/null +++ b/internal/awstools/timeseries_test.go @@ -0,0 +1,66 @@ +package awstools + +import ( + "io" + "io/ioutil" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +func TestNewTimeseries(t *testing.T) { + sess := session.Must(session.NewSession(&aws.Config{Region: aws.String("localhost")})) + upload := s3manager.NewUploader(sess) + var idxUp = 0 + uploads := make(map[string]int) + + var uploader = func(uploader *s3manager.Uploader, key string, bodyBuffer io.Reader) { + buf, _ := ioutil.ReadAll(bodyBuffer) + uploads[key] = len(buf) + idxUp++ + } + + ts := NewTimeseries(uploader, upload, "myfile") + // Write several times possible + for i := 0; i < 3; i++ { + n, err := ts.Write([]byte("test")) + if n != 4 || err != nil { + t.Errorf("Timeseries.Write() = %v %v, want 4 ", n, err) + return + } + if idxUp != 0 { + t.Errorf("Timesereis.Write() initiated an unexpected upload %v", uploads) + return + } + } + + // Uploads file upon closing + ts.Close() + if idxUp != 1 { + t.Errorf( + "Timeseries.Close() didn't upload one file (%v files sent) with content %v", + idxUp, + uploads, + ) + } + l, ok := uploads["myfile"] + if !ok || l != 4*3 { + t.Errorf( + "Timeseries.Close() didn't upload 'myfile' with 12 bytes (%v, %v)", + l, + ok, + ) + } + + // Can't write after closing + n, err := ts.Write([]byte("hello")) + if n != 0 || err == nil { + t.Errorf( + "Timeseries.Write() should have written nothing and err after closing (%v, %v)", + n, + err, + ) + } +} diff --git a/internal/awstools/uploader.go b/internal/awstools/uploader.go new file mode 100644 index 0000000..fa697b8 --- /dev/null +++ b/internal/awstools/uploader.go @@ -0,0 +1,28 @@ +package awstools + +import ( + "io" + "log" + "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +const awsBucket = "mats-l0-artifacts" + +// AWSUpload uploads file content to target bucket +func AWSUpload(uploader *s3manager.Uploader, key string, body io.Reader) { + _, err := uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(awsBucket), + Key: aws.String(strings.ReplaceAll(key, "\\", "/")), + Body: body, + }) + if err != nil { + log.Printf("Failed to upload file %v, %v", key, err) + } + +} + +// AWSUploadFunc is the signature of an AWS upload function +type AWSUploadFunc func(uploader *s3manager.Uploader, key string, body io.Reader) diff --git a/internal/exports/aws_s3_writer.go b/internal/exports/aws_s3_writer.go index d0d1dee..e8dab74 100644 --- a/internal/exports/aws_s3_writer.go +++ b/internal/exports/aws_s3_writer.go @@ -4,42 +4,36 @@ import ( "bytes" "fmt" "image/png" - "io" "log" "os" "path/filepath" - "strings" "sync" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/innosat-mats/rac-extract-payload/internal/aez" + "github.com/innosat-mats/rac-extract-payload/internal/awstools" "github.com/innosat-mats/rac-extract-payload/internal/common" + "github.com/innosat-mats/rac-extract-payload/internal/timeseries" ) -const awsBucket = "mats-l0-artifacts" const awsS3Region = "eu-north-1" -// AWSUpload uploads file content to target bucket -func AWSUpload(uploader *s3manager.Uploader, key string, body io.Reader) { - _, err := uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(awsBucket), - Key: aws.String(strings.ReplaceAll(key, "\\", "/")), - Body: body, - }) - if err != nil { - log.Printf("Failed to upload file %v, %v", key, err) +func csvAWSWriterFactoryCreator( + upload awstools.AWSUploadFunc, + uploader *s3manager.Uploader, + project string, +) timeseries.CSVFactory { + return func(pkg *common.DataRecord, stream timeseries.OutStream) (timeseries.CSVWriter, error) { + key := fmt.Sprintf("%v/%v.csv", project, stream.String()) + return timeseries.NewCSV(awstools.NewTimeseries(upload, uploader, key), key), nil } - } -// AWSUploadFunc is the signature of an AWS upload function -type AWSUploadFunc func(uploader *s3manager.Uploader, key string, body io.Reader) - // AWSS3CallbackFactory generates callbacks for writing to S3 instead of disk func AWSS3CallbackFactory( - upload AWSUploadFunc, + upload awstools.AWSUploadFunc, project string, awsDescriptionPath string, writeImages bool, @@ -49,6 +43,9 @@ func AWSS3CallbackFactory( sess := session.Must(session.NewSession(&aws.Config{Region: aws.String(awsS3Region)})) uploader := s3manager.NewUploader(sess) + timeseriesCollection := timeseries.NewCollection( + csvAWSWriterFactoryCreator(upload, uploader, project), + ) if awsDescriptionPath != "" { awsDescription, err := os.Open(awsDescriptionPath) @@ -96,9 +93,18 @@ func AWSS3CallbackFactory( }() } } + + if writeTimeseries && pkg.Data != nil { + // Write to the dedicated target stream + err := timeseriesCollection.Write(&pkg) + if err != nil { + log.Println(err) + } + } } teardown := func() { + timeseriesCollection.CloseAll() wg.Wait() } diff --git a/internal/exports/aws_s3_writer_test.go b/internal/exports/aws_s3_writer_test.go index a22221e..d11b0eb 100644 --- a/internal/exports/aws_s3_writer_test.go +++ b/internal/exports/aws_s3_writer_test.go @@ -9,9 +9,12 @@ import ( "sync" "testing" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3/s3manager" "github.com/innosat-mats/rac-extract-payload/internal/aez" "github.com/innosat-mats/rac-extract-payload/internal/common" + "github.com/innosat-mats/rac-extract-payload/internal/timeseries" ) func TestAWSS3CallbackFactory(t *testing.T) { @@ -25,7 +28,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { tests := []struct { name string args args - record common.DataRecord + records []common.DataRecord uploads map[string]int }{ { @@ -35,7 +38,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { descriptionFileName: "myfile.txt", descriptionFileBody: []byte("Hello"), }, - common.DataRecord{}, + []common.DataRecord{{}}, map[string]int{"myproj/ABOUT.txt": 5}, }, { @@ -45,7 +48,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { descriptionFileName: "myfile.md", descriptionFileBody: []byte("Hello"), }, - common.DataRecord{}, + []common.DataRecord{{}}, map[string]int{"ABOUT.md": 5}, }, { @@ -54,7 +57,17 @@ func TestAWSS3CallbackFactory(t *testing.T) { project: "myproj", descriptionFileName: "", }, - common.DataRecord{}, + []common.DataRecord{{}}, + map[string]int{}, + }, + { + "Doesn't upload timeseries for no data", + args{ + project: "myproj", + descriptionFileName: "", + writeTimeseries: true, + }, + []common.DataRecord{{}}, map[string]int{}, }, { @@ -63,7 +76,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { project: "myproj", writeImages: true, }, - common.DataRecord{ + []common.DataRecord{{ Origin: common.OriginDescription{Name: "MyRac.rac"}, Data: aez.CCDImage{ PackData: aez.CCDImagePackData{ @@ -74,7 +87,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { }, }, Buffer: make([]byte, 2*2*2), // 2x2 pixels, 2 bytes per pix - }, + }}, map[string]int{ "myproj/MyRac_5000000000.png": 76, // 8 + header "myproj/MyRac_5000000000.json": 853, // length of the json @@ -86,7 +99,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { project: "myproj", writeImages: false, }, - common.DataRecord{ + []common.DataRecord{{ Origin: common.OriginDescription{Name: "MyRac.rac"}, Data: aez.CCDImage{ PackData: aez.CCDImagePackData{ @@ -97,7 +110,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { }, }, Buffer: make([]byte, 2*2*2), // 2x2 pixels, 2 bytes per pix - }, + }}, map[string]int{}, }, { @@ -106,7 +119,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { project: "myproj", writeImages: true, }, - common.DataRecord{ + []common.DataRecord{{ Origin: common.OriginDescription{Name: "MyRac.rac"}, Data: aez.CCDImage{ PackData: aez.CCDImagePackData{ @@ -118,7 +131,7 @@ func TestAWSS3CallbackFactory(t *testing.T) { }, Error: errors.New("here be dragons"), Buffer: make([]byte, 2*2*2), // 2x2 pixels, 2 bytes per pix - }, + }}, map[string]int{}, }, { @@ -128,23 +141,31 @@ func TestAWSS3CallbackFactory(t *testing.T) { descriptionFileName: "info.json", descriptionFileBody: []byte("[42,42]"), writeImages: true, + writeTimeseries: true, }, - common.DataRecord{ - Origin: common.OriginDescription{Name: "MyRac.rac"}, - Data: aez.CCDImage{ - PackData: aez.CCDImagePackData{ - EXPTS: 5, - JPEGQ: aez.JPEGQUncompressed16bit, - NCOL: 1, - NROW: 2, + []common.DataRecord{ + { + Origin: common.OriginDescription{Name: "MyRac.rac"}, + Data: aez.CCDImage{ + PackData: aez.CCDImagePackData{ + EXPTS: 5, + JPEGQ: aez.JPEGQUncompressed16bit, + NCOL: 1, + NROW: 2, + }, }, + Buffer: make([]byte, 2*2*2), // 2x2 pixels, 2 bytes per pix + }, + { + Data: aez.HTR{}, }, - Buffer: make([]byte, 2*2*2), // 2x2 pixels, 2 bytes per pix }, map[string]int{ "myproj/ABOUT.json": 7, "myproj/MyRac_5000000000.png": 76, // 8 + header "myproj/MyRac_5000000000.json": 853, // length of the json + "myproj/CCD.csv": 649, // length of the first three lines csv (specs, header, datarow) + "myproj/HTR.csv": 975, }, }, } @@ -214,7 +235,9 @@ func TestAWSS3CallbackFactory(t *testing.T) { &wg, ) - callback(tt.record) + for _, record := range tt.records { + callback(record) + } teardown() if idxUp < len(tt.uploads) { @@ -223,3 +246,25 @@ func TestAWSS3CallbackFactory(t *testing.T) { }) } } + +func Test_csvAWSWriterFactoryCreator(t *testing.T) { + sess := session.Must(session.NewSession(&aws.Config{Region: aws.String("localhost")})) + upload := s3manager.NewUploader(sess) + uploads := make(map[string]int) + + var uploader = func(uploader *s3manager.Uploader, key string, bodyBuffer io.Reader) { + buf, _ := ioutil.ReadAll(bodyBuffer) + uploads[key] = len(buf) + } + factory := csvAWSWriterFactoryCreator(uploader, upload, "myproject") + writer, err := factory(&common.DataRecord{Data: aez.HTR{}}, timeseries.HTR) + if err != nil { + t.Errorf("csvAWSWriterFactoryCreator()'s factory returned error %v", err) + } + writer.Close() + want := "myproject/HTR.csv" + _, ok := uploads[want] + if !ok { + t.Errorf("csvAWSWriterFactoryCreator()'s factory produced uploads %v, want key %v", uploads, want) + } +} diff --git a/internal/exports/disk_writer.go b/internal/exports/disk_writer.go index 7577644..6c7185d 100644 --- a/internal/exports/disk_writer.go +++ b/internal/exports/disk_writer.go @@ -18,7 +18,7 @@ func csvName(dir string, packetType string) string { return filepath.Join(dir, name) } -func csvFileWriterFactoryFactory( +func csvFileWriterFactoryCreator( dir string, ) timeseries.CSVFactory { return func(pkg *common.DataRecord, stream timeseries.OutStream) (timeseries.CSVWriter, error) { @@ -40,7 +40,7 @@ func DiskCallbackFactory( wg *sync.WaitGroup, ) (common.Callback, common.CallbackTeardown) { var err error - timeseriesCollection := timeseries.NewCollection(csvFileWriterFactoryFactory(output)) + timeseriesCollection := timeseries.NewCollection(csvFileWriterFactoryCreator(output)) if writeImages || writeTimeseries { // Create Directory and File diff --git a/internal/exports/disk_writer_test.go b/internal/exports/disk_writer_test.go index f843bd0..f0b817e 100644 --- a/internal/exports/disk_writer_test.go +++ b/internal/exports/disk_writer_test.go @@ -34,7 +34,7 @@ func Test_csvName(t *testing.T) { } } -func TestDiskCallbackFactory(t *testing.T) { +func TestDiskCallbackFactoryCreator(t *testing.T) { type args struct { writeImages bool writeTimeseries bool diff --git a/internal/timeseries/csv.go b/internal/timeseries/csv.go index b32e46a..f2e1332 100644 --- a/internal/timeseries/csv.go +++ b/internal/timeseries/csv.go @@ -5,6 +5,8 @@ import ( "fmt" "io" "os" + + "github.com/innosat-mats/rac-extract-payload/internal/awstools" ) //CSV gives easy access for csv writing @@ -39,6 +41,11 @@ func (csv *CSV) Close() { if ok { f.Close() } + case *awstools.Timeseries: + writer, ok := csv.writer.(*awstools.Timeseries) + if ok { + writer.Close() + } } } diff --git a/internal/timeseries/csv_test.go b/internal/timeseries/csv_test.go index b7d004b..41de6d8 100644 --- a/internal/timeseries/csv_test.go +++ b/internal/timeseries/csv_test.go @@ -2,10 +2,16 @@ package timeseries import ( "encoding/csv" + "io" "io/ioutil" "os" "strings" "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/innosat-mats/rac-extract-payload/internal/awstools" ) func getTestFile() (*CSV, *os.File, error) { @@ -17,7 +23,7 @@ func getTestFile() (*CSV, *os.File, error) { } -func Test_CSV_Close(t *testing.T) { +func Test_CSV_Close_WithFile(t *testing.T) { csv, file, err := getTestFile() defer os.Remove(file.Name()) if err != nil { @@ -30,6 +36,24 @@ func Test_CSV_Close(t *testing.T) { t.Error("CSV.Close(), didn't Close file") } } +func Test_CSV_Close_Timeseries(t *testing.T) { + sess := session.Must(session.NewSession(&aws.Config{Region: aws.String("localhost")})) + upload := s3manager.NewUploader(sess) + var idxUp = 0 + var uploader = func(uploader *s3manager.Uploader, key string, bodyBuffer io.Reader) { + idxUp++ + } + key := "test" + ts := awstools.NewTimeseries(uploader, upload, key) + csv := NewCSV(ts, key) + if idxUp != 0 { + t.Errorf("Expected 0 uploads before CSV.Close(), found %v", idxUp) + } + csv.Close() + if idxUp != 1 { + t.Errorf("Expected 1 upload after CSV.Close(), found %v", idxUp) + } +} func Test_CSV_SetSpecifications(t *testing.T) { csv, file, err := getTestFile()