diff --git a/common.go b/common.go index d892a89..55f7ee8 100644 --- a/common.go +++ b/common.go @@ -6,20 +6,15 @@ package main import ( - "encoding/json" "errors" "flag" "fmt" "io" - "log" "os" "strings" - "time" "github.com/evolution-gaming/ease/internal/encoding" "github.com/evolution-gaming/ease/internal/logging" - "github.com/evolution-gaming/ease/internal/vqm" - "github.com/jszwec/csvutil" ) // Commander interface should be implemented by commands and sub-commands. @@ -53,165 +48,6 @@ func printSubCommandUsage(longHelp string, fs *flag.FlagSet) { fs.PrintDefaults() } -// namedVqmResult is structure that wraps vqm.Result with a name. -type namedVqmResult struct { - Name string - vqm.Result -} - -// report contains application execution result. -type report struct { - EncodingResult encoding.PlanResult - VQMResults []namedVqmResult -} - -// WriteJSON writes application execution result as JSON. -func (r *report) WriteJSON(w io.Writer) error { - // Write Plan execution result to JSON (for now) - res, err := json.MarshalIndent(r, "", " ") - if err != nil { - return fmt.Errorf("marshaling encoding result to JSON: %w", err) - } - _, err = w.Write(res) - if err != nil { - return fmt.Errorf("writing encoding result %w", err) - } - return nil -} - -// csvRecord contains result fields from one encode. -type csvRecord struct { - Name string - SourceFile string - CompressedFile string - Cmd string - HStime string - HUtime string - HElapsed string - Stime time.Duration - Utime time.Duration - Elapsed time.Duration - MaxRss int64 - VideoDuration float64 - AvgEncodingSpeed float64 - PSNR float64 - MS_SSIM float64 - VMAF float64 -} - -// Wrap rows of csvRecords mainly to attach relevant methods. -type csvReport struct { - rows []csvRecord -} - -func newCsvReport(r *report) (*csvReport, error) { - size := len(r.EncodingResult.RunResults) - if size != len(r.VQMResults) { - return nil, errors.New("Encoding result and VQM result size do not match") - } - - var report csvReport - report.rows = make([]csvRecord, 0, size) - - // Need to create an intermediate mapping from CompressedFile to VQM metrics to make - // merging fields from two sources easier (we cannot rely on order). CompressedFile - // being a unique identifier (Name does not work when there are multiple input files). - tVqms := make(map[string]vqm.VideoQualityMetrics, size) - for _, v := range r.VQMResults { - tVqms[v.CompressedFile] = v.Metrics - } - - // Final loop to merge into a single report. - for _, v := range r.EncodingResult.RunResults { - vqm, ok := tVqms[v.CompressedFile] - if !ok { - return nil, fmt.Errorf("no VQMs for map key: %s", v.CompressedFile) - } - report.rows = append(report.rows, csvRecord{ - Name: v.Name, - SourceFile: v.SourceFile, - CompressedFile: v.CompressedFile, - Cmd: v.Cmd, - HStime: v.Stats.HStime, - HUtime: v.Stats.HUtime, - HElapsed: v.Stats.HElapsed, - Stime: v.Stats.Stime, - Utime: v.Stats.Utime, - Elapsed: v.Stats.Elapsed, - MaxRss: v.Stats.MaxRss, - VideoDuration: v.VideoDuration, - AvgEncodingSpeed: v.AvgEncodingSpeed, - PSNR: vqm.PSNR, - MS_SSIM: vqm.MS_SSIM, - VMAF: vqm.VMAF, - }) - } - - return &report, nil -} - -// WriteCSV saves flat application report representation to io.Writer. -func (r *csvReport) WriteCSV(w io.Writer) error { - data, err := csvutil.Marshal(r.rows) - if err != nil { - return err - } - _, err2 := w.Write(data) - if err2 != nil { - return err2 - } - return nil -} - -// parseReportFile is a helper to read and parse report JSON file into report type. -func parseReportFile(fPath string) *report { - var r report - - b, err := os.ReadFile(fPath) - if err != nil { - log.Panicf("Unable to read file %s: %v", fPath, err) - } - - if err := json.Unmarshal(b, &r); err != nil { - log.Panic(err) - } - - return &r -} - -// sourceData is a helper data structure with fields related to single encoded file. -type sourceData struct { - CompressedFile string - WorkDir string - VqmResultFile string -} - -// extractSourceData create mapping from compressed file to sourceData. -// -// Since in report file we have separate keys RunResults and VQMResults and we -// need to merge fields from both, we create mapping from unique CompressedFile -// field to sourceData. -func extractSourceData(r *report) map[string]sourceData { - s := make(map[string]sourceData) - // Create map to sourceData (incomplete at this point) from RunResults - for i := range r.EncodingResult.RunResults { - v := &r.EncodingResult.RunResults[i] - sd := s[v.CompressedFile] - sd.WorkDir = v.WorkDir - sd.CompressedFile = v.CompressedFile - s[v.CompressedFile] = sd - } - - // Fill-in missing VqmResultFile field from VQMResult. - for i := range r.VQMResults { - v := &r.VQMResults[i] - sd := s[v.CompressedFile] - sd.VqmResultFile = v.ResultFile - s[v.CompressedFile] = sd - } - return s -} - // unrollResultErrors helper to unroll all errors from RunResults into a string. func unrollResultErrors(results []encoding.RunResult) string { sb := strings.Builder{} diff --git a/common_test.go b/common_test.go index 6f16b6d..60683b4 100644 --- a/common_test.go +++ b/common_test.go @@ -6,83 +6,11 @@ package main import ( - "bytes" - "os" - "strings" "testing" "github.com/stretchr/testify/assert" ) -func Test_extractSourceData(t *testing.T) { - given := parseReportFile("testdata/encoding_artifacts/report.json") - want := map[string]sourceData{ - "out/testsrc01_libx264.mp4": { - CompressedFile: "out/testsrc01_libx264.mp4", - WorkDir: "/tmp", - VqmResultFile: "out/testsrc01_libx264_vqm.json", - }, - "out/testsrc01_libx265.mp4": { - CompressedFile: "out/testsrc01_libx265.mp4", - WorkDir: "/tmp", - VqmResultFile: "out/testsrc01_libx265_vqm.json", - }, - "out/testsrc02_libx264.mp4": { - CompressedFile: "out/testsrc02_libx264.mp4", - WorkDir: "/tmp", - VqmResultFile: "out/testsrc02_libx264_vqm.json", - }, - "out/testsrc02_libx265.mp4": { - CompressedFile: "out/testsrc02_libx265.mp4", - WorkDir: "/tmp", - VqmResultFile: "out/testsrc02_libx265_vqm.json", - }, - } - - got := extractSourceData(given) - assert.Equal(t, want, got) -} - -func Test_parseReportFile(t *testing.T) { - got := parseReportFile("testdata/encoding_artifacts/report.json") - t.Log("Should have RunResults") - assert.Len(t, got.EncodingResult.RunResults, 4) - - t.Log("Should have VQMResults") - assert.Len(t, got.VQMResults, 4) -} - -func Test_report_WriteJSON(t *testing.T) { - // Do the round-trip of JOSN report unmarshalling-marshalling. - reportFile := "testdata/encoding_artifacts/report.json" - parsedReport := parseReportFile(reportFile) - - var got bytes.Buffer - err := parsedReport.WriteJSON(&got) - assert.NoError(t, err) - - want, err := os.ReadFile(reportFile) - assert.NoError(t, err) - wantStr := strings.TrimRight(string(want), "\n") - - assert.Equal(t, wantStr, got.String()) -} - -func Test_csvReport_writeCSV(t *testing.T) { - // Create report from fixture data. - reportFile := "testdata/encoding_artifacts/report.json" - parsedReport := parseReportFile(reportFile) - - csvReport, err := newCsvReport(parsedReport) - assert.NoError(t, err) - - var b bytes.Buffer - err = csvReport.WriteCSV(&b) - assert.NoError(t, err) - - assert.Len(t, b.Bytes(), 1332) -} - func Test_all_Positive(t *testing.T) { floatTests := map[string]struct { given []float64 diff --git a/config.go b/config.go index 81fdea9..d619236 100644 --- a/config.go +++ b/config.go @@ -23,7 +23,7 @@ import ( var ( ErrInvalidConfig = errors.New("invalid configuration") - defaultReportFile = "report.json" + defaultReportFile = "report.csv" ) // Config represent application configuration. diff --git a/ease_test.go b/ease_test.go index 5c88f3f..cf9214c 100644 --- a/ease_test.go +++ b/ease_test.go @@ -6,6 +6,7 @@ package main import ( + "encoding/csv" "fmt" "io" "os" @@ -13,6 +14,8 @@ import ( "path/filepath" "testing" + "github.com/evolution-gaming/ease/internal/encoding" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -23,24 +26,33 @@ func Test_RunApp_Run(t *testing.T) { ePlan := fixPlanConfig(t) outDir := path.Join(tempDir, "out") - t.Log("Should succeed execution with -plan flag") - // Run command will generate encoding artifacts and analysis artifacts. - err := CreateRunCommand().Run([]string{"-plan", ePlan, "-out-dir", outDir}) - assert.NoError(t, err, "Unexpected error running encode") + t.Run("Should succeed execution with -plan flag", func(t *testing.T) { + // Run command will generate encoding artifacts and analysis artifacts. + app := CreateRunCommand() + err := app.Run([]string{"-plan", ePlan, "-out-dir", outDir}) + assert.NoError(t, err, "Unexpected error running encode") + }) - buf, err2 := os.ReadFile(path.Join(outDir, "report.json")) - assert.NoError(t, err2, "Unexpected error reading report.json") - assert.Greater(t, len(buf), 0, "No data in report file") + t.Run("Should have a CSV report file", func(t *testing.T) { + fd, err2 := os.Open(path.Join(outDir, "report.csv")) + assert.NoError(t, err2, "Unexpected error opening report.csv") + defer fd.Close() + records, err3 := csv.NewReader(fd).ReadAll() + assert.NoError(t, err3, "Unexpected error reading CSV records") + // Expect 2 records: CSV header + record for 1 encoding. + assert.Len(t, records, 2, "Unexpected number of records in report file") + }) - t.Log("Analyse should create bitrate, VMAF, PSNR and SSIM plots") - bitratePlots, _ := filepath.Glob(fmt.Sprintf("%s/*/*bitrate.png", outDir)) - assert.Len(t, bitratePlots, 1, "Expecting one file for bitrate plot") + t.Run("Should create plots", func(t *testing.T) { + bitratePlots, _ := filepath.Glob(fmt.Sprintf("%s/*/*bitrate.png", outDir)) + assert.Len(t, bitratePlots, 1, "Expecting one file for bitrate plot") - vmafPlots, _ := filepath.Glob(fmt.Sprintf("%s/*/*vmaf.png", outDir)) - assert.Len(t, vmafPlots, 1, "Expecting one file for VMAF plot") + vmafPlots, _ := filepath.Glob(fmt.Sprintf("%s/*/*vmaf.png", outDir)) + assert.Len(t, vmafPlots, 1, "Expecting one file for VMAF plot") - psnrPlots, _ := filepath.Glob(fmt.Sprintf("%s/*/*psnr.png", outDir)) - assert.Len(t, psnrPlots, 1, "Expecting one file for PSNR plot") + psnrPlots, _ := filepath.Glob(fmt.Sprintf("%s/*/*psnr.png", outDir)) + assert.Len(t, psnrPlots, 1, "Expecting one file for PSNR plot") + }) } /************************************* @@ -165,6 +177,16 @@ func Test_RunApp_Run_WithInvalidApplicationConfig(t *testing.T) { assert.ErrorAs(t, gotErr, &expErr, "Expecting error of type AppError") } +func Test_RunApp_Run_MisalignedFrames(t *testing.T) { + plan := fixPlanConfigMisalignedFrames(t) + app := CreateRunCommand() + gotErr := app.Run([]string{"-plan", plan, "-out-dir", t.TempDir()}) + + var expErr *AppError + assert.ErrorAs(t, gotErr, &expErr, "Expecting error of type AppError") + assert.ErrorContains(t, gotErr, "VQM calculations had errors, see log for reasons") +} + // Functional tests for other sub-commands.. func TestIntegration_AllSubcommands(t *testing.T) { tempDir := t.TempDir() @@ -176,7 +198,7 @@ func TestIntegration_AllSubcommands(t *testing.T) { err := CreateRunCommand().Run([]string{"-plan", ePlan, "-out-dir", outDir}) require.NoError(t, err) - t.Run("Vqmplot should create plots", func(t *testing.T) { + t.Run("vqmplot should create plots", func(t *testing.T) { var vqmFile string // Need to get file with VQMs from encode stage. m, _ := filepath.Glob(fmt.Sprintf("%s/*vqm.json", outDir)) @@ -193,7 +215,7 @@ func TestIntegration_AllSubcommands(t *testing.T) { } }) - t.Run("Bitrate should create bitrate plot", func(t *testing.T) { + t.Run("bitrate should create bitrate plot", func(t *testing.T) { var compressedFile string // Need to get compressed file from encode stage. m, _ := filepath.Glob(fmt.Sprintf("%s/*.mp4", outDir)) @@ -205,4 +227,19 @@ func TestIntegration_AllSubcommands(t *testing.T) { assert.NoError(t, err, "Unexpected error running bitrate") assert.FileExists(t, outFile, "bitrate plot file missing") }) + + t.Run("new-plan should create plan template", func(t *testing.T) { + planFile := path.Join(t.TempDir(), "plan.json") + err := CreateNewPlanCommand().Run([]string{"-i", "video1.mp4", "-o", planFile}) + assert.NoError(t, err) + + b, err := os.ReadFile(planFile) + assert.NoError(t, err) + pc, err := encoding.NewPlanConfigFromJSON(b) + assert.NoError(t, err) + + assert.Len(t, pc.Inputs, 1) + assert.Equal(t, pc.Inputs[0], "video1.mp4") + assert.True(t, len(pc.Schemes) > 0) + }) } diff --git a/go.mod b/go.mod index d2bc519..71c6b7f 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/evolution-gaming/ease -go 1.18 +go 1.20 require ( github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 diff --git a/helpers_test.go b/helpers_test.go index b50abc7..7ef065e 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -12,6 +12,8 @@ import ( "os" "path" "testing" + + "github.com/evolution-gaming/ease/internal/tools" ) // fixPlanConfig fixture provides simple encoding plan. @@ -20,17 +22,17 @@ import ( // purposes of tests it is irrelevant if we use realistic encoder or just a // simple file copy. func fixPlanConfig(t *testing.T) (fPath string) { - payload := []byte(fmt.Sprintf(`{ + payload := []byte(`{ "Inputs": [ "testdata/video/testsrc01.mp4" ], "Schemes": [ { "Name": "simple_src_duplication", - "CommandTpl": ["cp -v ", "%%INPUT%% ", "%%OUTPUT%%.mp4"] + "CommandTpl": ["cp -v ", "%INPUT% ", "%OUTPUT%.mp4"] } ] - }`)) + }`) fPath = path.Join(t.TempDir(), "minimal.json") err := os.WriteFile(fPath, payload, fs.FileMode(0o644)) if err != nil { @@ -78,3 +80,32 @@ func fixCreateFakeFfmpegAndPutItOnPath(t *testing.T) { t.Fatalf("Failure copying: %v", err) } } + +// fixPlanConfigMisalignedFrames fixture returns a plan which will result in encoded file +// shorter by 1 frame e.g. first frame dropped. +// +// Note: this plan assumes ffmpeg doing actual encoding! +func fixPlanConfigMisalignedFrames(t *testing.T) (fPath string) { + ffmpegPath, err := tools.FfmpegPath() + if err != nil { + t.Fatalf("ffmpeg not found: %v", err) + } + payload := []byte(fmt.Sprintf(`{ + "Inputs": [ + "testdata/video/testsrc01.mp4" + ], + "Schemes": [ + { + "Name": "misaligned", + "CommandTpl": ["%s -i %%INPUT%% -vf \"trim=start_frame=1\" %%OUTPUT%%.mp4"] + } + ] + }`, ffmpegPath)) + + fPath = path.Join(t.TempDir(), "misaligned_plan.json") + err = os.WriteFile(fPath, payload, fs.FileMode(0o644)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + return +} diff --git a/internal/analysis/plot_test.go b/internal/analysis/plot_test.go index 044aef4..1ab70ef 100644 --- a/internal/analysis/plot_test.go +++ b/internal/analysis/plot_test.go @@ -27,8 +27,9 @@ func getVmafValues(t *testing.T) []float64 { j, err := os.Open(frameMetricsFile) require.NoError(t, err) + defer j.Close() - err2 := metrics.FromJSON(j) + err2 := json.NewDecoder(j).Decode(&metrics) require.NoError(t, err2, "Error Unmarshaling metrics") for _, v := range metrics { diff --git a/internal/encoding/plan.go b/internal/encoding/plan.go index 6ff36f1..3f68656 100644 --- a/internal/encoding/plan.go +++ b/internal/encoding/plan.go @@ -72,7 +72,7 @@ func (s *EncoderCmd) Run() RunResult { r.AddError(err) return r } else { - logging.Infof("Output redirected to file: %s", f.Name()) + logging.Debugf("Output redirected to file: %s", f.Name()) outWriter = io.MultiWriter(memWriter, f) defer f.Close() } @@ -243,9 +243,8 @@ func (s *Plan) Run() (PlanResult, error) { } for i := range s.Commands { - logging.Infof("Start encoding %s -> %s", s.Commands[i].SourceFile, s.Commands[i].CompressedFile) + logging.Infof("Encoding %s -> %s", s.Commands[i].SourceFile, s.Commands[i].CompressedFile) result.RunResults[i] = s.Commands[i].Run() - logging.Infof("Done encoding %s -> %s", s.Commands[i].SourceFile, s.Commands[i].CompressedFile) } result.EndTime = time.Now() diff --git a/internal/metric/store.go b/internal/metric/store.go new file mode 100644 index 0000000..5770ca2 --- /dev/null +++ b/internal/metric/store.go @@ -0,0 +1,134 @@ +// Copyright ©2022 Evolution. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +// Centralised store of various encode metrics. + +package metric + +import ( + "errors" + "fmt" + "sync" + "time" +) + +var ErrRecordNotFound = errors.New("record not found") + +type ID int64 + +type Store struct { + mu sync.RWMutex + records map[ID]Record + next ID +} + +func NewStore() *Store { + return &Store{ + records: make(map[ID]Record), + } +} + +func (s *Store) Insert(r Record) ID { + s.mu.Lock() + defer s.mu.Unlock() + + s.records[s.next] = r + id := s.next + s.next++ + + return id +} + +func (s *Store) Get(id ID) (Record, error) { + s.mu.RLock() + defer s.mu.RUnlock() + r, ok := s.records[id] + if !ok { + return r, fmt.Errorf("getting record: %w", ErrRecordNotFound) + } + + return r, nil +} + +func (s *Store) Exists(id ID) bool { + s.mu.RLock() + defer s.mu.RUnlock() + _, exists := s.records[id] + + return exists +} + +func (s *Store) GetIDs() []ID { + s.mu.RLock() + defer s.mu.RUnlock() + + ids := make([]ID, 0, len(s.records)) + for id := range s.records { + ids = append(ids, id) + } + return ids +} + +func (s *Store) Update(id ID, r Record) error { + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.records[id]; !exists { + return fmt.Errorf("updating record: %w", ErrRecordNotFound) + } + + s.records[id] = r + return nil +} + +func (s *Store) Delete(id ID) error { + s.mu.Lock() + defer s.mu.Unlock() + + if _, exists := s.records[id]; !exists { + return fmt.Errorf("deleting record: %w", ErrRecordNotFound) + } + + delete(s.records, id) + return nil +} + +// Need to create a huge record. +type Record struct { + Name string + SourceFile string + CompressedFile string + VQMResultFile string + Cmd string + HStime string + HUtime string + HElapsed string + Stime time.Duration + Utime time.Duration + Elapsed time.Duration + MaxRss int64 + VideoDuration float64 + AvgEncodingSpeed float64 + + PSNRMin float64 + PSNRMax float64 + PSNRMean float64 + PSNRHarmonicMean float64 + PSNRStDev float64 + PSNRVariance float64 + + MS_SSIMMin float64 + MS_SSIMMax float64 + MS_SSIMMean float64 + MS_SSIMHarmonicMean float64 + MS_SSIMStDev float64 + MS_SSIMVariance float64 + + VMAFMin float64 + VMAFMax float64 + VMAFMean float64 + VMAFHarmonicMean float64 + VMAFStDev float64 + VMAFVariance float64 +} diff --git a/internal/metric/store_test.go b/internal/metric/store_test.go new file mode 100644 index 0000000..5fd9605 --- /dev/null +++ b/internal/metric/store_test.go @@ -0,0 +1,148 @@ +// Copyright ©2022 Evolution. All rights reserved. +// Use of this source code is governed by a MIT-style +// license that can be found in the LICENSE file. + +package metric + +import ( + "fmt" + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/assert" +) + +// Number of iterations for stress scenarios. +var stressIter int = 1_000_000 + +func Test_Store_HappyPath(t *testing.T) { + store := NewStore() + + var id1, id2 ID + var r1, r2 Record + r1 = Record{Name: "first"} + r2 = Record{Name: "second"} + + // Insertion works as expected. + id1 = store.Insert(r1) + id2 = store.Insert(r2) + + t.Run("Retrieve all inserted IDs", func(t *testing.T) { + ids := store.GetIDs() + assert.ElementsMatch(t, []ID{id1, id2}, ids) + }) + + t.Run("Inserted records exist", func(t *testing.T) { + // Inserted records Exist! + assert.True(t, store.Exists(id1)) + assert.True(t, store.Exists(id2)) + }) + + t.Run("Inserted records can be retrieved", func(t *testing.T) { + gotR1, err := store.Get(id1) + assert.NoError(t, err) + assert.Equal(t, r1, gotR1) + gotR2, err := store.Get(id2) + assert.NoError(t, err) + assert.Equal(t, r2, gotR2) + }) + + t.Run("Update existing record", func(t *testing.T) { + new := Record{Name: "new name"} + // Check that before update the new and old really are not equal. + old, _ := store.Get(id1) + assert.NotEqual(t, old, new) + + // Now we do the update. + err := store.Update(id1, new) + assert.NoError(t, err) + // Retrieve updated record an compare, they should be equal. + updated, _ := store.Get(id1) + assert.Equal(t, new, updated) + }) + + t.Run("Delete record", func(t *testing.T) { + id := store.Insert(Record{Name: "delete this record"}) + assert.True(t, store.Exists(id)) + + err := store.Delete(id) + assert.NoError(t, err) + assert.False(t, store.Exists(id)) + }) +} + +func Test_Store_SadPath(t *testing.T) { + store := NewStore() + nonExistentID := ID(100) + + t.Run("Error retrieving non-existent record", func(t *testing.T) { + // Check that non existent record is indeed non-existent. + assert.False(t, store.Exists(nonExistentID)) + _, err := store.Get(nonExistentID) + assert.ErrorIs(t, err, ErrRecordNotFound) + }) + + t.Run("Error updating non-existent record", func(t *testing.T) { + err := store.Update(nonExistentID, Record{Name: "update"}) + assert.ErrorIs(t, err, ErrRecordNotFound) + }) + + t.Run("Error deleting non-existent record", func(t *testing.T) { + err := store.Delete(nonExistentID) + assert.ErrorIs(t, err, ErrRecordNotFound) + }) +} + +func Test_Store_StressInsertDelete(t *testing.T) { + var wg sync.WaitGroup + var errCounter atomic.Int64 + store := NewStore() + // Insert part stressing. + for i := 0; i < stressIter; i++ { + wg.Add(1) + go func(iter int) { + defer wg.Done() + store.Insert(Record{Name: fmt.Sprintf("iter %d", iter)}) + }(i) + } + wg.Wait() + + assert.Len(t, store.records, stressIter) + assert.Len(t, store.GetIDs(), stressIter) + // Delete part stressing. + for _, id := range store.GetIDs() { + wg.Add(1) + go func(id ID) { + defer wg.Done() + if err := store.Delete(id); err != nil { + errCounter.Add(1) + } + }(id) + } + wg.Wait() + + if cnt := errCounter.Load(); cnt != 0 { + t.Errorf("Stress Delete caused %d errors", cnt) + } +} + +func Test_Store_StressUpdate(t *testing.T) { + var wg sync.WaitGroup + var errCounter atomic.Int64 + store := NewStore() + id := store.Insert(Record{Name: "first"}) + + for i := 0; i < stressIter; i++ { + wg.Add(1) + go func(iter int) { + defer wg.Done() + if err := store.Update(id, Record{Name: fmt.Sprintf("update %d", iter)}); err != nil { + errCounter.Add(1) + } + }(i) + } + if cnt := errCounter.Load(); cnt != 0 { + t.Errorf("Stress Update caused %d errors", cnt) + } +} diff --git a/internal/tools/ffmpeg.go b/internal/tools/ffmpeg.go index 87f1f9b..ccfefb2 100644 --- a/internal/tools/ffmpeg.go +++ b/internal/tools/ffmpeg.go @@ -54,15 +54,6 @@ func FfprobePath() (string, error) { func FfprobeExtractMetadata(videoFile string) (video.Metadata, error) { var vmeta video.Metadata - type metadata struct { - CodecName string `json:"codec_name,omitempty"` - FrameRate string `json:"r_frame_rate,omitempty"` - Duration float64 `json:"duration,omitempty,string"` - Width int `json:"width,omitempty"` - Height int `json:"height,omitempty"` - BitRate int `json:"bit_rate,omitempty,string"` - } - if _, err := os.Stat(videoFile); os.IsNotExist(err) { return vmeta, fmt.Errorf("FfprobeExtractMetadata() os.Stat: %w", err) } @@ -86,16 +77,26 @@ func FfprobeExtractMetadata(videoFile string) (video.Metadata, error) { return vmeta, fmt.Errorf("FfprobeExtractMetadata() exec error: %w", err) } + // A temporary structures to unmarshal JSON from ffprobe output. + type metadata struct { + CodecName string `json:"codec_name,omitempty"` + FrameRate string `json:"r_frame_rate,omitempty"` + Duration float64 `json:"duration,omitempty,string"` + Width int `json:"width,omitempty"` + Height int `json:"height,omitempty"` + BitRate int `json:"bit_rate,omitempty,string"` + FrameCount int `json:"nb_frames,omitempty,string"` + } // Unmarshal metadata from both "streams" and "format" JSON objects. meta := &struct { Streams []metadata Format metadata }{} - if err := json.Unmarshal(out, &meta); err != nil { return vmeta, fmt.Errorf("FfprobeExtractMetadata() json.Unmarshal: %w", err) } logging.Debugf("%s %+v", videoFile, meta) + vmeta = video.Metadata(meta.Streams[0]) // For mkv container Streams does not contain duration, so we have to look into Format. vmeta.Duration = math.Max(vmeta.Duration, meta.Format.Duration) diff --git a/internal/tools/ffmpeg_test.go b/internal/tools/ffmpeg_test.go index 5c1e608..63583a3 100644 --- a/internal/tools/ffmpeg_test.go +++ b/internal/tools/ffmpeg_test.go @@ -85,12 +85,13 @@ func Test_FfprobeExtractMetadata(t *testing.T) { videoFile := "../../testdata/video/testsrc02.mp4" t.Run("Should extract VideoMetadata from video file", func(t *testing.T) { want := video.Metadata{ - Duration: 10, - Width: 1280, - Height: 720, - BitRate: 86740, - CodecName: "h264", - FrameRate: "24/1", + Duration: 10, + Width: 1280, + Height: 720, + BitRate: 86740, + FrameCount: 240, + CodecName: "h264", + FrameRate: "24/1", } got, err := FfprobeExtractMetadata(videoFile) diff --git a/internal/video/metadata.go b/internal/video/metadata.go index fe331ba..f897774 100644 --- a/internal/video/metadata.go +++ b/internal/video/metadata.go @@ -8,12 +8,13 @@ package video // Metadata type contains useful video stream metadata. type Metadata struct { - CodecName string `json:"codec_name,omitempty"` - FrameRate string `json:"r_frame_rate,omitempty"` - Duration float64 `json:"duration,omitempty,string"` - Width int `json:"width,omitempty"` - Height int `json:"height,omitempty"` - BitRate int `json:"bit_rate,omitempty,string"` + CodecName string + FrameRate string + Duration float64 + Width int + Height int + BitRate int + FrameCount int } // MetadataExtractor is the interface that wraps ExtractMetadata method. diff --git a/internal/vqm/frame.go b/internal/vqm/frame.go index 228eaf0..902cd41 100644 --- a/internal/vqm/frame.go +++ b/internal/vqm/frame.go @@ -22,23 +22,6 @@ type FrameMetric struct { type FrameMetrics []FrameMetric -func (fm *FrameMetrics) FromJSON(r io.Reader) error { - data, err := io.ReadAll(r) - if err != nil { - return fmt.Errorf("FromJSON() Read from io.Reader: %w", err) - } - - if err := json.Unmarshal(data, fm); err != nil { - return fmt.Errorf("FromJSON() JSON unmarshal: %w", err) - } - - return nil -} - -func (fm *FrameMetrics) Get() []FrameMetric { - return []FrameMetric(*fm) -} - // FromFfmpegVMAF will Unmarshal libvmaf's JSON into FrameMetrics. func (fm *FrameMetrics) FromFfmpegVMAF(jsonReader io.Reader) error { b, err := io.ReadAll(jsonReader) @@ -61,16 +44,3 @@ func (fm *FrameMetrics) FromFfmpegVMAF(jsonReader io.Reader) error { } return nil } - -func (fm *FrameMetrics) ToJSON(w io.Writer) error { - jDoc, err := json.MarshalIndent(fm, "", " ") - if err != nil { - return fmt.Errorf("ToJSON() marshal: %w", err) - } - - if _, err := w.Write(jDoc); err != nil { - return fmt.Errorf("ToJSON() write to Writer: %w", err) - } - - return nil -} diff --git a/internal/vqm/frame_test.go b/internal/vqm/frame_test.go index f27523c..55ba91b 100644 --- a/internal/vqm/frame_test.go +++ b/internal/vqm/frame_test.go @@ -17,9 +17,7 @@ import ( var ( metricsFile = "../../testdata/vqm/libvmaf_v2.3.1.json" // Expected count of metrics from metricsFile. - wantMetricCount = 10 - frameMetricsFile = "../../testdata/vqm/frame_metrics.json" - wantFrameMetricsCount = 2158 + wantMetricCount = 10 ) func fixLoadVmafJSONMetrics(t *testing.T) io.Reader { @@ -47,47 +45,3 @@ func TestFrameMetrics_FromFfmpegVMAF(t *testing.T) { } }) } - -func TestFrameMetrics_ToJSON(t *testing.T) { - // Check To/FromJSON round trip. - var ( - gotJSON bytes.Buffer - metrics FrameMetrics - ) - - t.Run("Marshal-unmarshal roundtrip should work", func(t *testing.T) { - wantJSON, err := os.ReadFile(frameMetricsFile) - require.NoError(t, err) - - // Unmarshal into FrameMetrics: JSON -> FrameMetrics. - err2 := metrics.FromJSON(bytes.NewBuffer(wantJSON)) - require.NoError(t, err2) - assert.Len(t, metrics, wantFrameMetricsCount, "FrameMetrics count mismatch") - - // Marshal back to JSON: FrameMetrics -> JSON. - err3 := metrics.ToJSON(&gotJSON) - require.NoError(t, err3) - - assert.JSONEq(t, string(wantJSON), gotJSON.String()) - }) -} - -func TestFrameMetrics_FromJSON(t *testing.T) { - t.Run("Should Unmarshal from valid JSON into FrameMetrics", func(t *testing.T) { - var fm FrameMetrics - j, err := os.Open(frameMetricsFile) - require.NoError(t, err) - - err2 := fm.FromJSON(j) - require.NoError(t, err2) - - assert.Len(t, fm, wantFrameMetricsCount) - }) - t.Run("Unsuccessful Unmarshal from empty", func(t *testing.T) { - var fm FrameMetrics - - j := bytes.NewBuffer([]byte{}) - err := fm.FromJSON(j) - assert.Error(t, err) - }) -} diff --git a/internal/vqm/vqm.go b/internal/vqm/vqm.go index 5c47b48..74dbdd1 100644 --- a/internal/vqm/vqm.go +++ b/internal/vqm/vqm.go @@ -18,7 +18,10 @@ import ( "text/template" "github.com/evolution-gaming/ease/internal/logging" + "github.com/evolution-gaming/ease/internal/tools" "github.com/google/shlex" + "gonum.org/v1/gonum/floats" + "gonum.org/v1/gonum/stat" ) var DefaultFfmpegVMAFTemplate = "-hide_banner -i {{.CompressedFile}} -i {{.SourceFile}} " + @@ -30,23 +33,7 @@ var DefaultFfmpegVMAFTemplate = "-hide_banner -i {{.CompressedFile}} -i {{.Sourc type Measurer interface { // Measure should run actual VQM measuring process Measure() error - // GetResult will retrieve VQM measurement Result - GetResult() (Result, error) -} - -// Result represents Measurer tool execution result. -type Result struct { - SourceFile string - CompressedFile string - ResultFile string - Metrics VideoQualityMetrics -} - -// VideoQualityMetrics is a struct of meaningful Video Quality Metrics. -type VideoQualityMetrics struct { - PSNR float64 - MS_SSIM float64 - VMAF float64 + GetMetrics() (*AggregateMetric, error) } // FfmpegVMAFConfig exposes parameters for ffmpegVMAF creation. @@ -125,61 +112,103 @@ type ffmpegVMAF struct { } func (f *ffmpegVMAF) Measure() error { + var err error + if f.measured { return errors.New("Measure() already executed") } + + // First we should check if source and compressed files have equal number of + // frames, if it is not the case - then VQM will be off. + srcMeta, err := tools.FfprobeExtractMetadata(f.sourceFile) + if err != nil { + return fmt.Errorf("source file metadata: %w", err) + } + compressedMeta, err := tools.FfprobeExtractMetadata(f.compressedFile) + if err != nil { + return fmt.Errorf("compressed file metadata: %w", err) + } + if srcMeta.FrameCount != compressedMeta.FrameCount { + return fmt.Errorf("frame count mismatch: source %v != compressed %v", srcMeta.FrameCount, compressedMeta.FrameCount) + } + cmd := exec.Command(f.exePath, f.ffmpegArgs...) //#nosec G204 logging.Debugf("VQM tool command: %v", cmd.Args) - var err error f.output, err = cmd.CombinedOutput() if err != nil { logging.Infof("VQM tool execution failure:\n%s", cmd.String()) logging.Infof("VQM tool output:\n%s", f.output) return fmt.Errorf("VQM calculation error: %w", err) } + f.measured = true return nil } -func (f *ffmpegVMAF) GetResult() (Result, error) { - var vqr Result +type AggregateMetric struct { + VMAF Metric + PSNR Metric + MS_SSIM Metric +} + +type Metric struct { + Mean float64 + HarmonicMean float64 + Min float64 + Max float64 + StDev float64 + Variance float64 +} - // Depend on Measure() being executed. +func (f *ffmpegVMAF) GetMetrics() (*AggregateMetric, error) { if !f.measured { - return vqr, errors.New("GetResult() depends on Measure() called first") + return nil, errors.New("GetMetrics() depends on Measure() called first") } - resData, err := os.ReadFile(f.resultFile) + am := &AggregateMetric{} + // Unmarshal metrics from result file. + j, err := os.Open(f.resultFile) if err != nil { - return vqr, fmt.Errorf("VideoQualityTool.GetResult() reading %s: %w", f.resultFile, err) + return nil, fmt.Errorf("opening file: %w", err) } - vqm, err := f.unmarshalResultJSON(resData) - if err != nil { - return vqr, fmt.Errorf("VideoQualityTool.GetResult() in resultParser(): %w", err) - } - vqr = Result{ - Metrics: vqm, - SourceFile: f.sourceFile, - CompressedFile: f.compressedFile, - ResultFile: f.resultFile, - } - return vqr, nil -} -// unmarshalResultJSON will unmarshal libvmaf JSON result to VideoQualityMetrics. -func (f *ffmpegVMAF) unmarshalResultJSON(data []byte) (VideoQualityMetrics, error) { - var vqm VideoQualityMetrics - res := &ffmpegVMAFResult{} - - if err := json.Unmarshal(data, res); err != nil { - return vqm, fmt.Errorf("parseResult() unmarshal JSON: %w", err) + var metrics FrameMetrics + err2 := metrics.FromFfmpegVMAF(j) + if err2 != nil { + return nil, fmt.Errorf("parsing JSON: %w", err2) } - vqm = VideoQualityMetrics{ - VMAF: res.PooledMetrics.VMAF.Mean, - PSNR: res.PooledMetrics.PSNR.Mean, - MS_SSIM: res.PooledMetrics.MS_SSIM.Mean, + + // Convert to vectors to apply aggregations. + m := struct { + VMAF []float64 + PSNR []float64 + MS_SSIM []float64 + }{} + for _, v := range metrics { + m.VMAF = append(m.VMAF, v.VMAF) + m.PSNR = append(m.PSNR, v.PSNR) + m.MS_SSIM = append(m.MS_SSIM, v.MS_SSIM) } - return vqm, nil + + am.VMAF.Min = floats.Min(m.VMAF) + am.VMAF.Max = floats.Max(m.VMAF) + am.VMAF.HarmonicMean = stat.HarmonicMean(m.VMAF, nil) + am.VMAF.Variance = stat.Variance(m.VMAF, nil) + am.VMAF.Mean, am.VMAF.StDev = stat.MeanStdDev(m.VMAF, nil) + + am.PSNR.Min = floats.Min(m.PSNR) + am.PSNR.Max = floats.Max(m.PSNR) + am.PSNR.HarmonicMean = stat.HarmonicMean(m.PSNR, nil) + am.PSNR.Variance = stat.Variance(m.PSNR, nil) + am.PSNR.Mean, am.PSNR.StDev = stat.MeanStdDev(m.PSNR, nil) + + am.MS_SSIM.Min = floats.Min(m.MS_SSIM) + am.MS_SSIM.Max = floats.Max(m.MS_SSIM) + am.MS_SSIM.HarmonicMean = stat.HarmonicMean(m.MS_SSIM, nil) + am.MS_SSIM.Variance = stat.Variance(m.MS_SSIM, nil) + am.MS_SSIM.Mean, am.MS_SSIM.StDev = stat.MeanStdDev(m.MS_SSIM, nil) + + return am, nil } // This and following are helper structs for libvmaf JSON result. diff --git a/internal/vqm/vqm_test.go b/internal/vqm/vqm_test.go index e1e8465..1dd428f 100644 --- a/internal/vqm/vqm_test.go +++ b/internal/vqm/vqm_test.go @@ -25,13 +25,13 @@ import ( var saveResultFile = flag.Bool("save-result", false, "Save result file") func TestFfmpegVMAFImplementsMeasurer(t *testing.T) { - // Test that tool implement Measurer interface. + // Test that tool implements Measurer interface. var _ Measurer = &ffmpegVMAF{} } func TestFfmpegVMAF(t *testing.T) { var tool Measurer // tool under test - var result Result // result from tool under test + var aggMetrics *AggregateMetric wrkDir := t.TempDir() ffmpegExePath, _ := tools.FfmpegPath() @@ -61,19 +61,46 @@ func TestFfmpegVMAF(t *testing.T) { assert.NoError(t, err) }) - t.Run("Call GetResult()", func(t *testing.T) { + t.Run("Call GetMetrics()", func(t *testing.T) { var err error - result, err = tool.GetResult() + aggMetrics, err = tool.GetMetrics() assert.NoError(t, err) }) - t.Run("VideoQualityResult should have metrics", func(t *testing.T) { - assert.NotEqual(t, result.Metrics.VMAF, 0, "No VMAF metric detected") - assert.NotEqual(t, result.Metrics.PSNR, 0, "No PSNR metric detected") - assert.NotEqual(t, result.Metrics.MS_SSIM, 0, "No MS-SSIM metric detected") + t.Run("Aggregate metrics should be non-zero", func(t *testing.T) { + assert.NotEqual(t, aggMetrics.VMAF.Mean, float64(0), "No VMAF metric detected") + assert.NotEqual(t, aggMetrics.PSNR.Mean, float64(0), "No PSNR metric detected") }) } +func TestFfmpegVMAF_WithMSSSIM(t *testing.T) { + ffmpegExePath, _ := tools.FfmpegPath() + libvmafModelPath, _ := tools.FindLibvmafModel() + srcFile := "../../testdata/video/testsrc01.mp4" + compressedFile := "../../testdata/video/testsrc01.mp4" + + // Enable MS-SSIM calculation feature, which is not enabled by default. + ffmpegVMAFTemplate := "-hide_banner -i {{.CompressedFile}} -i {{.SourceFile}} " + + "-lavfi libvmaf=n_subsample=1:log_path={{.ResultFile}}:feature=name=psnr|name=float_ms_ssim:" + + "log_fmt=json:model=path={{.ModelPath}}:n_threads={{.NThreads}} -f null -" + + tool, err := NewFfmpegVMAF(&FfmpegVMAFConfig{ + FfmpegPath: ffmpegExePath, + LibvmafModelPath: libvmafModelPath, + FfmpegVMAFTemplate: ffmpegVMAFTemplate, + ResultFile: path.Join(t.TempDir(), "result_2.json"), + }, compressedFile, srcFile) + assert.NoError(t, err) + + assert.NoError(t, tool.Measure()) + + aggMetrics, err := tool.GetMetrics() + assert.NoError(t, err) + assert.NotEqual(t, aggMetrics.VMAF.Mean, float64(0), "No VMAF metric detected") + assert.NotEqual(t, aggMetrics.PSNR.Mean, float64(0), "No PSNR metric detected") + assert.NotEqual(t, aggMetrics.MS_SSIM.Mean, float64(0), "No MS-SSIM metric detected") +} + func TestFfmpegVMAF_Negative(t *testing.T) { ffmpegExePath, _ := tools.FfmpegPath() libvmafModelPath, _ := tools.FindLibvmafModel() @@ -112,10 +139,10 @@ func TestFfmpegVMAF_Negative(t *testing.T) { return tool } - t.Run("Call GetResult() before Measure() should error", func(t *testing.T) { - wantErrMsg := "GetResult() depends on Measure() called first" + t.Run("Call GetMetrics() before Measure() should error", func(t *testing.T) { + wantErrMsg := "GetMetrics() depends on Measure() called first" tool := getValidTool() - _, err := tool.GetResult() + _, err := tool.GetMetrics() require.Error(t, err) assert.ErrorContains(t, err, wantErrMsg) }) diff --git a/run.go b/run.go index 6bf60bf..000f1f2 100644 --- a/run.go +++ b/run.go @@ -7,7 +7,7 @@ package main import ( - "encoding/json" + "encoding/csv" "errors" "flag" "fmt" @@ -19,7 +19,9 @@ import ( "github.com/evolution-gaming/ease/internal/analysis" "github.com/evolution-gaming/ease/internal/encoding" "github.com/evolution-gaming/ease/internal/logging" + "github.com/evolution-gaming/ease/internal/metric" "github.com/evolution-gaming/ease/internal/vqm" + "github.com/jszwec/csvutil" ) // CreateRunCommand will create Commander instance from App. @@ -33,8 +35,9 @@ Examples: ease run -plan plan.json -out-dir path/to/output/dir` app := &App{ - fs: flag.NewFlagSet("run", flag.ContinueOnError), - gf: globalFlags{}, + fs: flag.NewFlagSet("run", flag.ContinueOnError), + gf: globalFlags{}, + mStore: metric.NewStore(), } app.gf.Register(app.fs) app.fs.StringVar(&app.flPlan, "plan", "", "Encoding plan configuration file") @@ -64,6 +67,8 @@ type App struct { gf globalFlags // Dry run mode flag flDryRun bool + // Encoding and VQ metric store + mStore *metric.Store } // init will do App state initialization. @@ -122,26 +127,48 @@ func (a *App) init(args []string) error { } // encode will run encoding stage of plan execution. -func (a *App) encode(plan encoding.Plan) (*report, error) { - rep := &report{} - +func (a *App) encode(plan encoding.Plan) error { result, err := plan.Run() // Make sure to log any errors from RunResults. if ur := unrollResultErrors(result.RunResults); ur != "" { logging.Infof("Run had following ERRORS:\n%s", ur) } if err != nil { - return rep, err + return fmt.Errorf("plan run: %w", err) + } + + // Store encoding related metrics into mStore. + for _, res := range result.RunResults { + id := a.mStore.Insert(metric.Record{ + Name: res.Name, + SourceFile: res.SourceFile, + CompressedFile: res.CompressedFile, + Cmd: res.Cmd, + HStime: res.Stats.HStime, + HUtime: res.Stats.HUtime, + HElapsed: res.Stats.HElapsed, + Stime: res.Stats.Stime, + Utime: res.Stats.Utime, + Elapsed: res.Stats.Elapsed, + MaxRss: res.Stats.MaxRss, + VideoDuration: res.VideoDuration, + AvgEncodingSpeed: res.AvgEncodingSpeed, + }) + logging.Debugf("Storing record (id=%v) with encoding metrics", id) } - rep.EncodingResult = result // Do VQM calculations for encoded videos. var vqmFailed bool = false + for _, id := range a.mStore.GetIDs() { + record, err := a.mStore.Get(id) + if err != nil { + vqmFailed = true + logging.Infof("Error retrieving record from metric store: %s", err) + continue + } - for i := range result.RunResults { - r := &result.RunResults[i] - resFile := strings.TrimSuffix(r.CompressedFile, filepath.Ext(r.CompressedFile)) + "_vqm.json" - + // Derive result file path. + resFile := strings.TrimSuffix(record.CompressedFile, filepath.Ext(record.CompressedFile)) + "_vqm.json" // Create VMAF tool configuration. vmafCfg := vqm.FfmpegVMAFConfig{ FfmpegPath: a.cfg.FfmpegPath.Value(), @@ -150,74 +177,88 @@ func (a *App) encode(plan encoding.Plan) (*report, error) { ResultFile: resFile, } - vqmTool, err2 := vqm.NewFfmpegVMAF(&vmafCfg, r.CompressedFile, r.SourceFile) + vqmTool, err2 := vqm.NewFfmpegVMAF(&vmafCfg, record.CompressedFile, record.SourceFile) if err2 != nil { vqmFailed = true logging.Infof("Error while initializing VQM tool: %s", err2) continue } - logging.Infof("Start measuring VQMs for %s", r.CompressedFile) + logging.Infof("Start measuring VQMs for %s", record.CompressedFile) if err2 = vqmTool.Measure(); err2 != nil { vqmFailed = true - logging.Infof("Failed calculate VQM for %s due to error: %s", r.CompressedFile, err2) + logging.Infof("Failed calculate VQM for %s due to error: %s", record.CompressedFile, err2) continue } - res, err2 := vqmTool.GetResult() + res, err2 := vqmTool.GetMetrics() if err2 != nil { - logging.Infof("Error while getting VQM result for %s: %s", r.CompressedFile, err2) + vqmFailed = true + logging.Infof("Error while getting metrics for %s: %s", record.CompressedFile, err2) + continue } - rep.VQMResults = append(rep.VQMResults, namedVqmResult{Name: r.Name, Result: res}) - logging.Infof("Done measuring VQMs for %s", r.CompressedFile) + // Update record with VQ metrics. + record.VQMResultFile = resFile + record.PSNRMin = res.PSNR.Min + record.PSNRMax = res.PSNR.Max + record.PSNRMean = res.PSNR.Mean + record.PSNRHarmonicMean = res.PSNR.HarmonicMean + record.PSNRStDev = res.PSNR.StDev + record.PSNRVariance = res.PSNR.Variance + + record.VMAFMin = res.VMAF.Min + record.VMAFMax = res.VMAF.Max + record.VMAFMean = res.VMAF.Mean + record.VMAFHarmonicMean = res.VMAF.HarmonicMean + record.VMAFStDev = res.VMAF.StDev + record.VMAFVariance = res.VMAF.Variance + + record.MS_SSIMMin = res.MS_SSIM.Min + record.MS_SSIMMax = res.MS_SSIM.Max + record.MS_SSIMMean = res.MS_SSIM.Mean + record.MS_SSIMHarmonicMean = res.MS_SSIM.HarmonicMean + record.MS_SSIMStDev = res.MS_SSIM.StDev + record.MS_SSIMVariance = res.MS_SSIM.Variance + + if err := a.mStore.Update(id, record); err != nil { + vqmFailed = true + logging.Infof("Error updating record (id=%v) for %s: %s", id, record.CompressedFile, err2) + continue + } + logging.Debugf("Updating record (id=%v) with VQ metrics", id) + logging.Infof("Done measuring VQMs for %s", record.CompressedFile) } if vqmFailed { - return rep, errors.New("VQM calculations had errors, see log for reasons") + return errors.New("VQM calculations had errors, see log for reasons") } - return rep, nil + return nil } // analyse will run analysis stage of plan execution. -func (a *App) analyse(rep *report) error { - // Extract data to work with. - srcData := extractSourceData(rep) - d, err := json.MarshalIndent(srcData, "", " ") - if err != nil { - return err - } - logging.Debugf("Analysis for:\n%s", d) - - // TODO: this is a good place to do goroutines iterate over sources and do stuff. - - for _, v := range srcData { +func (a *App) analyse() error { + for _, id := range a.mStore.GetIDs() { + v, err := a.mStore.Get(id) + if err != nil { + return fmt.Errorf("fetching record by id (%v): %w", id, err) + } // Create separate dir for results. base := path.Base(v.CompressedFile) base = strings.TrimSuffix(base, path.Ext(base)) logging.Infof("Analysing %s", v.CompressedFile) resDir := path.Join(a.flOutDir, base) - if err := os.MkdirAll(resDir, os.FileMode(0o755)); err != nil { + if err = os.MkdirAll(resDir, os.FileMode(0o755)); err != nil { return fmt.Errorf("creating directory: %w", err) } - compressedFile := v.CompressedFile - vqmFile := v.VqmResultFile - // In case compressed and VQM result file path in not absolute we assume - // it must be relative to WorkDir. - if !path.IsAbs(compressedFile) { - compressedFile = path.Join(v.WorkDir, compressedFile) - } - if !path.IsAbs(vqmFile) { - vqmFile = path.Join(v.WorkDir, vqmFile) - } bitratePlot := path.Join(resDir, base+"_bitrate.png") vmafPlot := path.Join(resDir, base+"_vmaf.png") psnrPlot := path.Join(resDir, base+"_psnr.png") msssimPlot := path.Join(resDir, base+"_ms-ssim.png") - jsonFd, err := os.Open(vqmFile) + jsonFd, err := os.Open(v.VQMResultFile) if err != nil { return fmt.Errorf("opening VQM file: %w", err) } @@ -245,7 +286,7 @@ func (a *App) analyse(rep *report) error { skipPSNR := all(psnrs, 0) skipMSSSIM := all(msssims, 0) - if err := analysis.MultiPlotBitrate(compressedFile, bitratePlot, a.cfg.FfprobePath.Value()); err != nil { + if err := analysis.MultiPlotBitrate(v.CompressedFile, bitratePlot, a.cfg.FfprobePath.Value()); err != nil { return fmt.Errorf("creating bitrate plot: %w", err) } logging.Infof("Bitrate plot done: %s", bitratePlot) @@ -281,41 +322,37 @@ func (a *App) analyse(rep *report) error { return nil } -func (a *App) saveJSONReport(rep *report) error { - // Write report of encoding results. - reportPath := path.Join(a.flOutDir, a.cfg.ReportFileName.Value()) - reportOut, err := os.Create(reportPath) - if err != nil { - return fmt.Errorf("creating report file: %w", err) - } - defer reportOut.Close() - return rep.WriteJSON(reportOut) -} +// saveReport writes recorded metrics to report file. +func (a *App) saveReport() error { + var report []metric.Record -func (a *App) saveCSVReport(rep *report) error { - // Convert report to record-like representation to be written as CSV. - csvReport, err := newCsvReport(rep) - if err != nil { - return fmt.Errorf("converting to record-like representation: %w", err) + for _, id := range a.mStore.GetIDs() { + r, err := a.mStore.Get(id) + if err != nil { + return fmt.Errorf("getting record (id=%v) from metric store: %w", id, err) + } + report = append(report, r) } - // TODO: For time being just replace ".json" extension with ".csv". If need be this - // can be later exposed as separate configuration option. + reportPath := path.Join(a.flOutDir, a.cfg.ReportFileName.Value()) - reportPath = strings.TrimSuffix(reportPath, ".json") + ".csv" reportOut, err := os.Create(reportPath) if err != nil { return fmt.Errorf("creating CSV report file: %w", err) } defer reportOut.Close() - if err := csvReport.WriteCSV(reportOut); err != nil { + w := csv.NewWriter(reportOut) + if err := csvutil.NewEncoder(w).Encode(report); err != nil { return fmt.Errorf("writing CSV report: %w", err) } + w.Flush() + return nil } // Run is main entry point into App execution. func (a *App) Run(args []string) error { + logging.Infof("ease version: %s", vInfo) if err := a.init(args); err != nil { return err } @@ -333,7 +370,12 @@ func (a *App) Run(args []string) error { return &AppError{exitCode: 1, msg: err.Error()} } - plan := encoding.NewPlan(pc, a.flOutDir) + // To avoid ambiguity, resolve output path to absolute representation. + outDirPath, err := filepath.Abs(a.flOutDir) + if err != nil { + return &AppError{exitCode: 1, msg: err.Error()} + } + plan := encoding.NewPlan(pc, outDirPath) // Early return in "dry run" mode. if a.flDryRun { @@ -342,23 +384,20 @@ func (a *App) Run(args []string) error { } // Run encode stage. - rep, err := a.encode(plan) - if err != nil { + if err = a.encode(plan); err != nil { return &AppError{exitCode: 1, msg: err.Error()} } - // Save reports to filesystem. - if err = a.saveJSONReport(rep); err != nil { - return &AppError{exitCode: 1, msg: err.Error()} - } - if err = a.saveCSVReport(rep); err != nil { + // Save report. + if err = a.saveReport(); err != nil { return &AppError{exitCode: 1, msg: err.Error()} } // Run analysis stage. - if err = a.analyse(rep); err != nil { + if err = a.analyse(); err != nil { return &AppError{exitCode: 1, msg: err.Error()} } + logging.Info("Done") return nil }