diff --git a/internal/exports/parquet_writer_test.go b/internal/exports/parquet_writer_test.go index a5b851d..a9bbc45 100644 --- a/internal/exports/parquet_writer_test.go +++ b/internal/exports/parquet_writer_test.go @@ -20,22 +20,46 @@ func Test_parquetName(t *testing.T) { packet common.DataRecord stream timeseries.OutStream } - record := common.DataRecord{ - Origin: &common.OriginDescription{Name: "File1.rac"}, - RamsesHeader: &ramses.Ramses{}, - RamsesTMHeader: &ramses.TMHeader{}, - SourceHeader: &innosat.SourcePacketHeader{}, - TMHeader: &innosat.TMHeader{}, - Data: &aez.STAT{}, + record := []common.DataRecord{ + { + Origin: &common.OriginDescription{Name: "File1.rac"}, + RamsesHeader: &ramses.Ramses{}, + RamsesTMHeader: &ramses.TMHeader{}, + SourceHeader: &innosat.SourcePacketHeader{}, + TMHeader: &innosat.TMHeader{}, + Data: &aez.STAT{}, + }, + { + Origin: &common.OriginDescription{Name: "File2.rac"}, + RamsesHeader: &ramses.Ramses{}, + RamsesTMHeader: &ramses.TMHeader{}, + SourceHeader: &innosat.SourcePacketHeader{}, + TMHeader: &innosat.TMHeader{}, + RID: aez.CCD3, + Data: &aez.CCDImage{ + PackData: &aez.CCDImagePackData{ + JPEGQ: aez.JPEGQUncompressed16bit, + NCOL: 1, + NROW: 2, + EXPTS: 6, + }, + ImageFileName: "File1_6000000000_3.png", + }, + Buffer: []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + } + stream := []timeseries.OutStream{ + timeseries.OutStreamFromDataRecord(&record[0]), + timeseries.OutStreamFromDataRecord(&record[1]), } - stream := timeseries.OutStreamFromDataRecord(&record) tests := []struct { name string args args want string }{ - {"Case 1", args{".", record, stream}, filepath.FromSlash("STAT/1980/1/5/23/File1.parquet")}, - {"Case 2", args{"my/dir", record, stream}, filepath.FromSlash("my/dir/STAT/1980/1/5/23/File1.parquet")}, + {"File without path", args{".", record[0], stream[0]}, filepath.FromSlash("STAT/1980/1/5/File1.parquet")}, + {"File with path", args{"my/dir", record[0], stream[0]}, filepath.FromSlash("my/dir/STAT/1980/1/5/File1.parquet")}, + {"CCD file", args{".", record[1], stream[1]}, filepath.FromSlash("CCD/1980/1/5/23/File2.parquet")}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -291,7 +315,10 @@ func TestParquetCallbackFactoryCreator(t *testing.T) { for _, want := range tt.wantFiles { // Test each output for file name - savePath := filepath.Join(dir, want.prefix, "1980", "1", "5", "23") + savePath := filepath.Join(dir, want.prefix, "1980", "1", "5") + if want.prefix == "CCD" { + savePath = filepath.Join(savePath, "23") + } path := filepath.Join(savePath, want.base) _, err := os.ReadFile(path) if err != nil { diff --git a/internal/timeseries/parquet_collection.go b/internal/timeseries/parquet_collection.go index 1198564..fa3d6d6 100644 --- a/internal/timeseries/parquet_collection.go +++ b/internal/timeseries/parquet_collection.go @@ -32,8 +32,13 @@ func ParquetName(pkg *common.DataRecord, stream OutStream) string { fmt.Sprintf("%v", tmTime.Year()), fmt.Sprintf("%v", int(tmTime.Month())), fmt.Sprintf("%v", tmTime.Day()), - fmt.Sprintf("%v", tmTime.Hour()), ) + if stream.String() == "CCD" { + prefix = filepath.Join( + prefix, + fmt.Sprintf("%v", tmTime.Hour()), + ) + } baseName := filepath.Base(pkg.Origin.Name) ext := filepath.Ext(pkg.Origin.Name) name := fmt.Sprintf("%v.parquet", strings.TrimSuffix(baseName, ext)) diff --git a/internal/timeseries/parquet_collection_test.go b/internal/timeseries/parquet_collection_test.go index b056ccb..4c92311 100644 --- a/internal/timeseries/parquet_collection_test.go +++ b/internal/timeseries/parquet_collection_test.go @@ -46,7 +46,7 @@ func TestParquetCollection_Write(t *testing.T) { }, false, []string{ - filepath.FromSlash("STAT/1980/1/5/23/test1.parquet"), + filepath.FromSlash("STAT/1980/1/5/test1.parquet"), }, }, { @@ -71,8 +71,8 @@ func TestParquetCollection_Write(t *testing.T) { }, false, []string{ - filepath.FromSlash("HTR/1980/1/5/23/test1.parquet"), - filepath.FromSlash("STAT/1980/1/5/23/test1.parquet"), + filepath.FromSlash("HTR/1980/1/5/test1.parquet"), + filepath.FromSlash("STAT/1980/1/5/test1.parquet"), }, }, { @@ -97,8 +97,35 @@ func TestParquetCollection_Write(t *testing.T) { }, false, []string{ - filepath.FromSlash("STAT/1980/1/5/23/test1.parquet"), - filepath.FromSlash("STAT/1980/1/5/23/test2.parquet"), + filepath.FromSlash("STAT/1980/1/5/test1.parquet"), + filepath.FromSlash("STAT/1980/1/5/test2.parquet"), + }, + }, + { + "CCD is partitioned by hour", + []common.DataRecord{ + { + Origin: &common.OriginDescription{Name: "test1"}, + RamsesHeader: &ramses.Ramses{}, + RamsesTMHeader: &ramses.TMHeader{}, + SourceHeader: &innosat.SourcePacketHeader{}, + TMHeader: &innosat.TMHeader{}, + RID: aez.CCD3, + Data: &aez.CCDImage{ + PackData: &aez.CCDImagePackData{ + JPEGQ: aez.JPEGQUncompressed16bit, + NCOL: 1, + NROW: 2, + EXPTS: 6, + }, + ImageFileName: "File1_6000000000_3.png", + }, + Buffer: []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}, + }, + }, + false, + []string{ + filepath.FromSlash("CCD/1980/1/5/23/test1.parquet"), }, }, } diff --git a/raclambda/app.py b/raclambda/app.py index 1d30bd6..95e9f56 100644 --- a/raclambda/app.py +++ b/raclambda/app.py @@ -10,7 +10,7 @@ from raclambda.raclambda_stack import RacLambdaStack -RAC_VERSION = "v1.3.1" +RAC_VERSION = "v1.4.0" RAC_OS = "Linux" RAC_URL = f"https://github.com/innosat-mats/rac-extract-payload/releases/download/{RAC_VERSION}/Rac_for_{RAC_OS}.tar.gz" # noqa: E501 RAC_DIR = "./raclambda/handler" @@ -37,7 +37,7 @@ app, "RacLambdaStack", input_bucket_name="ops-payload-level0-source", - output_bucket_name="ops-payload-level0-v0.2", + output_bucket_name="ops-payload-level0-v0.3", queue_arn_export_name="L0RACFetcherStackOutputQueue", config_ssm_name="/rclone/l0-fetcher", rclone_arn="arn:aws:lambda:eu-north-1:671150066425:layer:rclone-amd64:1",