Skip to content

Commit

Permalink
Only partition ccd hourly (#176)
Browse files Browse the repository at this point in the history
* Lock linux build env to 20.04

* Only partition CCD by hour

* Bump versions
  • Loading branch information
Andreas Skyman authored Mar 1, 2023
1 parent 6e20c68 commit f38baa3
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 19 deletions.
49 changes: 38 additions & 11 deletions internal/exports/parquet_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,46 @@ func Test_parquetName(t *testing.T) {
packet common.DataRecord
stream timeseries.OutStream
}
record := common.DataRecord{
Origin: &common.OriginDescription{Name: "File1.rac"},
RamsesHeader: &ramses.Ramses{},
RamsesTMHeader: &ramses.TMHeader{},
SourceHeader: &innosat.SourcePacketHeader{},
TMHeader: &innosat.TMHeader{},
Data: &aez.STAT{},
record := []common.DataRecord{
{
Origin: &common.OriginDescription{Name: "File1.rac"},
RamsesHeader: &ramses.Ramses{},
RamsesTMHeader: &ramses.TMHeader{},
SourceHeader: &innosat.SourcePacketHeader{},
TMHeader: &innosat.TMHeader{},
Data: &aez.STAT{},
},
{
Origin: &common.OriginDescription{Name: "File2.rac"},
RamsesHeader: &ramses.Ramses{},
RamsesTMHeader: &ramses.TMHeader{},
SourceHeader: &innosat.SourcePacketHeader{},
TMHeader: &innosat.TMHeader{},
RID: aez.CCD3,
Data: &aez.CCDImage{
PackData: &aez.CCDImagePackData{
JPEGQ: aez.JPEGQUncompressed16bit,
NCOL: 1,
NROW: 2,
EXPTS: 6,
},
ImageFileName: "File1_6000000000_3.png",
},
Buffer: []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff},
},
}
stream := []timeseries.OutStream{
timeseries.OutStreamFromDataRecord(&record[0]),
timeseries.OutStreamFromDataRecord(&record[1]),
}
stream := timeseries.OutStreamFromDataRecord(&record)
tests := []struct {
name string
args args
want string
}{
{"Case 1", args{".", record, stream}, filepath.FromSlash("STAT/1980/1/5/23/File1.parquet")},
{"Case 2", args{"my/dir", record, stream}, filepath.FromSlash("my/dir/STAT/1980/1/5/23/File1.parquet")},
{"File without path", args{".", record[0], stream[0]}, filepath.FromSlash("STAT/1980/1/5/File1.parquet")},
{"File with path", args{"my/dir", record[0], stream[0]}, filepath.FromSlash("my/dir/STAT/1980/1/5/File1.parquet")},
{"CCD file", args{".", record[1], stream[1]}, filepath.FromSlash("CCD/1980/1/5/23/File2.parquet")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -291,7 +315,10 @@ func TestParquetCallbackFactoryCreator(t *testing.T) {

for _, want := range tt.wantFiles {
// Test each output for file name
savePath := filepath.Join(dir, want.prefix, "1980", "1", "5", "23")
savePath := filepath.Join(dir, want.prefix, "1980", "1", "5")
if want.prefix == "CCD" {
savePath = filepath.Join(savePath, "23")
}
path := filepath.Join(savePath, want.base)
_, err := os.ReadFile(path)
if err != nil {
Expand Down
7 changes: 6 additions & 1 deletion internal/timeseries/parquet_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ func ParquetName(pkg *common.DataRecord, stream OutStream) string {
fmt.Sprintf("%v", tmTime.Year()),
fmt.Sprintf("%v", int(tmTime.Month())),
fmt.Sprintf("%v", tmTime.Day()),
fmt.Sprintf("%v", tmTime.Hour()),
)
if stream.String() == "CCD" {
prefix = filepath.Join(
prefix,
fmt.Sprintf("%v", tmTime.Hour()),
)
}
baseName := filepath.Base(pkg.Origin.Name)
ext := filepath.Ext(pkg.Origin.Name)
name := fmt.Sprintf("%v.parquet", strings.TrimSuffix(baseName, ext))
Expand Down
37 changes: 32 additions & 5 deletions internal/timeseries/parquet_collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestParquetCollection_Write(t *testing.T) {
},
false,
[]string{
filepath.FromSlash("STAT/1980/1/5/23/test1.parquet"),
filepath.FromSlash("STAT/1980/1/5/test1.parquet"),
},
},
{
Expand All @@ -71,8 +71,8 @@ func TestParquetCollection_Write(t *testing.T) {
},
false,
[]string{
filepath.FromSlash("HTR/1980/1/5/23/test1.parquet"),
filepath.FromSlash("STAT/1980/1/5/23/test1.parquet"),
filepath.FromSlash("HTR/1980/1/5/test1.parquet"),
filepath.FromSlash("STAT/1980/1/5/test1.parquet"),
},
},
{
Expand All @@ -97,8 +97,35 @@ func TestParquetCollection_Write(t *testing.T) {
},
false,
[]string{
filepath.FromSlash("STAT/1980/1/5/23/test1.parquet"),
filepath.FromSlash("STAT/1980/1/5/23/test2.parquet"),
filepath.FromSlash("STAT/1980/1/5/test1.parquet"),
filepath.FromSlash("STAT/1980/1/5/test2.parquet"),
},
},
{
"CCD is partitioned by hour",
[]common.DataRecord{
{
Origin: &common.OriginDescription{Name: "test1"},
RamsesHeader: &ramses.Ramses{},
RamsesTMHeader: &ramses.TMHeader{},
SourceHeader: &innosat.SourcePacketHeader{},
TMHeader: &innosat.TMHeader{},
RID: aez.CCD3,
Data: &aez.CCDImage{
PackData: &aez.CCDImagePackData{
JPEGQ: aez.JPEGQUncompressed16bit,
NCOL: 1,
NROW: 2,
EXPTS: 6,
},
ImageFileName: "File1_6000000000_3.png",
},
Buffer: []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff},
},
},
false,
[]string{
filepath.FromSlash("CCD/1980/1/5/23/test1.parquet"),
},
},
}
Expand Down
4 changes: 2 additions & 2 deletions raclambda/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from raclambda.raclambda_stack import RacLambdaStack


RAC_VERSION = "v1.3.1"
RAC_VERSION = "v1.4.0"
RAC_OS = "Linux"
RAC_URL = f"https://github.com/innosat-mats/rac-extract-payload/releases/download/{RAC_VERSION}/Rac_for_{RAC_OS}.tar.gz" # noqa: E501
RAC_DIR = "./raclambda/handler"
Expand All @@ -37,7 +37,7 @@
app,
"RacLambdaStack",
input_bucket_name="ops-payload-level0-source",
output_bucket_name="ops-payload-level0-v0.2",
output_bucket_name="ops-payload-level0-v0.3",
queue_arn_export_name="L0RACFetcherStackOutputQueue",
config_ssm_name="/rclone/l0-fetcher",
rclone_arn="arn:aws:lambda:eu-north-1:671150066425:layer:rclone-amd64:1",
Expand Down

0 comments on commit f38baa3

Please sign in to comment.