Skip to content

Commit

Permalink
test(workflow): test resolution of media
Browse files Browse the repository at this point in the history
  • Loading branch information
hbomb79 committed Jul 20, 2024
1 parent 62a8374 commit b8eb8ef
Show file tree
Hide file tree
Showing 16 changed files with 202 additions and 94 deletions.
4 changes: 3 additions & 1 deletion internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ func (service *activityService) Run(ctx context.Context) error {
service.eventBus.RegisterHandlerChannel(messageChan,
event.IngestUpdateEvent, event.IngestCompleteEvent, event.TranscodeUpdateEvent,
event.TranscodeTaskProgressEvent, event.TranscodeCompleteEvent, event.WorkflowUpdateEvent,
event.DownloadUpdateEvent, event.DownloadCompleteEvent, event.DownloadProgressEvent)
event.DownloadUpdateEvent, event.DownloadCompleteEvent, event.DownloadProgressEvent,
event.NewMediaEvent, event.DeleteMediaEvent,
)

log.Emit(logger.NEW, "Activity service started\n")
for {
Expand Down
9 changes: 8 additions & 1 deletion internal/api/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

const (
TitleIngestUpdate = "INGEST_UPDATE"
TitleMediaUpdate = "MEDIA_UPDATE"
TitleTranscodeUpdate = "TRANSCODE_TASK_UPDATE"
TitleTranscodeProgressUpdate = "TRANSCODE_TASK_PROGRESS_UPDATE"
)
Expand Down Expand Up @@ -75,7 +76,13 @@ func (hub *broadcaster) BroadcastWorkflowUpdate(id uuid.UUID) error {
}

func (hub *broadcaster) BroadcastMediaUpdate(id uuid.UUID) error {
return errors.New("not yet implemented")
media := hub.store.GetMedia(id)
hub.broadcast(TitleMediaUpdate, map[string]interface{}{
"media_id": id,
"media": media,
})

return nil
}

// nullsafeNewDto returns nil if the given model is nil, else it will call the
Expand Down
6 changes: 3 additions & 3 deletions internal/api/controllers/ingests/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,13 @@ func scrapedMetadataToDto(metadata *media.FileMediaMetadata) *gen.FileMetadata {
return &gen.FileMetadata{
EpisodeNumber: metadata.EpisodeNumber,
Episodic: metadata.Episodic,
FrameHeight: metadata.FrameH,
FrameWidth: metadata.FrameW,
FrameHeight: &metadata.FrameH,
FrameWidth: &metadata.FrameW,
Path: metadata.Path,
Runtime: metadata.Runtime,
SeasonNumber: metadata.SeasonNumber,
Title: metadata.Title,
Year: metadata.Year,
Year: &metadata.Year,
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/api/controllers/medias/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

type (
Store interface {
GetMedia(mediaID uuid.UUID) *media.Container
GetMovie(movieID uuid.UUID) (*media.Movie, error)
GetEpisode(episodeID uuid.UUID) (*media.Episode, error)
GetInflatedSeries(seriesID uuid.UUID) (*media.InflatedSeries, error)
Expand Down
3 changes: 3 additions & 0 deletions internal/database/migrations/0001_initial.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ CREATE TABLE media(
title TEXT NOT NULL,
adult BOOLEAN NOT NULL,
source_path TEXT NOT NULL,
frame_width INT NOT NULL,
frame_height INT NOT NULL,

-- Nullable columns which must be specified if the media t is episode
episode_number INT CHECK (episode_number IS NULL OR episode_number >= 0),
Expand Down Expand Up @@ -102,6 +104,7 @@ CREATE TABLE workflow_criteria(
match_combine_type INT NOT NULL,
match_value TEXT NOT NULL,
workflow_id UUID NOT NULL,
position INT NOT NULL,

CONSTRAINT workflow_criteria_fk_workflow_id FOREIGN KEY(workflow_id) REFERENCES workflow(id) ON DELETE CASCADE
);
Expand Down
4 changes: 2 additions & 2 deletions internal/http/tmdb/media_conv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ func TmdbEpisodeToMedia(ep *Episode, isSeasonAdult bool, metadata *media.FileMed
return &media.Episode{
Model: media.Model{ID: uuid.New(), TmdbID: ep.ID.String(), Title: ep.Name},
Watchable: media.Watchable{
MediaResolution: media.MediaResolution{Width: *metadata.FrameW, Height: *metadata.FrameH},
MediaResolution: media.MediaResolution{Width: metadata.FrameW, Height: metadata.FrameH},
SourcePath: metadata.Path,
Adult: isSeasonAdult,
},
Expand Down Expand Up @@ -44,7 +44,7 @@ func TmdbMovieToMedia(movie *Movie, metadata *media.FileMediaMetadata) *media.Mo
Model: media.Model{ID: uuid.New(), TmdbID: movie.ID.String(), Title: movie.Name},
Genres: TmdbGenresToMedia(movie.Genres),
Watchable: media.Watchable{
MediaResolution: media.MediaResolution{Width: *metadata.FrameW, Height: *metadata.FrameH},
MediaResolution: media.MediaResolution{Width: metadata.FrameW, Height: metadata.FrameH},
SourcePath: metadata.Path,
Adult: movie.Adult,
},
Expand Down
4 changes: 2 additions & 2 deletions internal/http/tmdb/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func (searcher *tmdbSearcher) GetSeason(seriesID string, seasonNumber int) (*Sea
// to whittle them down to a singular result. To do so, the year and popularity
// of the results is taken in to consideration.
func (searcher *tmdbSearcher) handleSearchResults(results []SearchResultItem, metadata *media.FileMediaMetadata) (*SearchResultItem, error) {
if metadata.Year != nil {
if metadata.Year != 0 {
if metadata.Episodic {
filterResultsInPlace(&results, metadata, func(resultDate time.Time, metadataDate time.Time) bool {
return resultDate.Compare(metadataDate) >= 0
Expand Down Expand Up @@ -263,7 +263,7 @@ func filterResultsInPlace(results *[]SearchResultItem, metadata *media.FileMedia
return time.Date(year, 1, 1, 0, 0, 0, 0, time.UTC)
}

yearFromMetadata := timeFromYear(*metadata.Year)
yearFromMetadata := timeFromYear(metadata.Year)
insertionIndex := 0
for _, v := range *results {
yearFromResult := timeFromYear(v.effectiveDate().Year())
Expand Down
2 changes: 1 addition & 1 deletion internal/ingest/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (item *IngestItem) ingest(eventBus event.EventCoordinator, scraper Scraper,
} else if meta == nil {
return Trouble{error: errors.New("metadata scrape returned no error, but nil payload received"), tType: MetadataFailure}
} else {
log.Emit(logger.DEBUG, "Scraped metadata for item %s:\n%#v\n", item, meta)
log.Emit(logger.WARNING, "Scraped metadata for item %s:\n%s\n", item, meta)
item.ScrapedMetadata = meta
}
}
Expand Down
12 changes: 6 additions & 6 deletions internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func Test_EpisodeImports_CorrectlySaved(t *testing.T) {
SeasonNumber: 1,
EpisodeNumber: 1,
Runtime: "69420",
Year: &year,
FrameW: &frameSize,
FrameH: &frameSize,
Year: year,
FrameW: frameSize,
FrameH: frameSize,
Path: files[0],
}

Expand Down Expand Up @@ -194,9 +194,9 @@ func Test_MovieImports_CorrectlySaved(t *testing.T) {
Title: "Test Movie",
Episodic: false,
Runtime: "69420",
Year: &year,
FrameW: &frameSize,
FrameH: &frameSize,
Year: year,
FrameW: frameSize,
FrameH: frameSize,
Path: files[0],
}

Expand Down
26 changes: 19 additions & 7 deletions internal/media/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package media

import (
"errors"
"fmt"
"path/filepath"
"regexp"
"strconv"
Expand All @@ -17,9 +18,9 @@ type (
SeasonNumber int
EpisodeNumber int
Runtime string
Year *int
FrameW *int
FrameH *int
Year int
FrameW int
FrameH int
Path string
}

Expand Down Expand Up @@ -83,7 +84,7 @@ func (scraper *MetadataScraper) extractTitleInformation(title string, output *Fi
output.SeasonNumber = convertToInt(seasonGroups[2])
output.EpisodeNumber = convertToInt(seasonGroups[3])
year := convertToInt(seasonGroups[4])
output.Year = &year
output.Year = year

return nil
}
Expand All @@ -95,7 +96,7 @@ func (scraper *MetadataScraper) extractTitleInformation(title string, output *Fi
output.SeasonNumber = -1
output.EpisodeNumber = -1
year := convertToInt(movieGroups[2])
output.Year = &year
output.Year = year

return nil
}
Expand All @@ -119,8 +120,8 @@ func (scraper *MetadataScraper) extractFfprobeInformation(path string, output *F
width := stream.GetWidth()
height := stream.GetHeight()

output.FrameW = &width
output.FrameH = &height
output.FrameW = width
output.FrameH = height
output.Runtime = metadata.GetFormat().GetDuration()

return nil
Expand All @@ -137,3 +138,14 @@ func convertToInt(input string) int {

return v
}

func (m FileMediaMetadata) String() string {
return fmt.Sprintf(`FileMediaMetadata {
Title = %s,
Episodic? = %v (season = %+v, episode = %+v),
Runtime = %v,
Year = %+v,
Resolution = %+v x %+v,
Path = %s,
}`, m.Title, m.Episodic, m.SeasonNumber, m.EpisodeNumber, m.Runtime, m.Year, m.FrameW, m.FrameH, m.Path)
}
27 changes: 14 additions & 13 deletions internal/media/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type (
}

MediaResolution struct {
Width int
Height int
Width int `db:"frame_width"`
Height int `db:"frame_height"`
}

// Season represents the information Thea stores about a season
Expand Down Expand Up @@ -167,12 +167,12 @@ type Store struct{ mediaGenreStore }
func (store *Store) SaveMovie(db database.Queryable, movie *Movie) error {
var updatedMovie Movie
if err := db.QueryRowx(`
INSERT INTO media(id, type, tmdb_id, title, adult, source_path, created_at, updated_at)
VALUES($1, $2, $3, $4, $5, $6, current_timestamp, current_timestamp)
INSERT INTO media(id, type, tmdb_id, title, adult, source_path, frame_width, frame_height, created_at, updated_at)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, current_timestamp, current_timestamp)
ON CONFLICT(tmdb_id, type) DO UPDATE
SET (updated_at, title, adult, source_path) = (current_timestamp, EXCLUDED.title, EXCLUDED.adult, EXCLUDED.source_path)
RETURNING id, tmdb_id, title, adult, source_path, created_at, updated_at;
`, movie.ID, "movie", movie.TmdbID, movie.Title, movie.Adult, movie.SourcePath).StructScan(&updatedMovie); err != nil {
SET (updated_at, title, adult, source_path, frame_width, frame_height) = (current_timestamp, EXCLUDED.title, EXCLUDED.adult, EXCLUDED.source_path, EXCLUDED.frame_width, EXCLUDED.frame_height)
RETURNING id, tmdb_id, title, adult, source_path, created_at, updated_at, frame_width, frame_height;
`, movie.ID, "movie", movie.TmdbID, movie.Title, movie.Adult, movie.SourcePath, movie.Width, movie.Height).StructScan(&updatedMovie); err != nil {
return err
}

Expand Down Expand Up @@ -237,13 +237,14 @@ func (store *Store) SaveSeason(db database.Queryable, season *Season) error {
func (store *Store) SaveEpisode(db database.Queryable, episode *Episode) error {
var updatedEpisode Episode
if err := db.QueryRowx(`
INSERT INTO media(id, type, tmdb_id, episode_number, title, source_path, season_id, adult, created_at, updated_at)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, current_timestamp, current_timestamp)
INSERT INTO media(id, type, tmdb_id, episode_number, title, source_path, season_id, adult, frame_width, frame_height, created_at, updated_at)
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, current_timestamp, current_timestamp)
ON CONFLICT(tmdb_id, type) DO UPDATE
SET (episode_number, title, source_path, season_id, updated_at, adult) =
(EXCLUDED.episode_number, EXCLUDED.title, EXCLUDED.source_path, EXCLUDED.season_id, current_timestamp, EXCLUDED.adult)
RETURNING id, tmdb_id, episode_number, title, source_path, season_id, adult, created_at, updated_at;
`, episode.ID, "episode", episode.TmdbID, episode.EpisodeNumber, episode.Title, episode.SourcePath, episode.SeasonID, episode.Adult).StructScan(&updatedEpisode); err != nil {
SET (episode_number, title, source_path, season_id, updated_at, adult, frame_width, frame_height) =
(EXCLUDED.episode_number, EXCLUDED.title, EXCLUDED.source_path, EXCLUDED.season_id, current_timestamp, EXCLUDED.adult, EXCLUDED.frame_width, EXCLUDED.frame_height)
RETURNING id, tmdb_id, episode_number, title, source_path, season_id, adult, frame_width, frame_height, created_at, updated_at;
`, episode.ID, "episode", episode.TmdbID, episode.EpisodeNumber, episode.Title, episode.SourcePath, episode.SeasonID, episode.Adult, episode.Width, episode.Height).
StructScan(&updatedEpisode); err != nil {
return err
}

Expand Down
47 changes: 39 additions & 8 deletions internal/workflow/store.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package workflow

import (
"cmp"
"fmt"
"slices"
"time"

"github.com/google/uuid"
Expand All @@ -18,10 +20,15 @@ type (
CreatedAt time.Time `db:"created_at"`
Enabled bool `db:"enabled"`
Label string `db:"label"`
Criteria database.JSONColumn[[]match.Criteria] `db:"criteria"`
Criteria database.JSONColumn[[]criteriaModel] `db:"criteria"`
Targets database.JSONColumn[[]*ffmpeg.Target] `db:"targets"`
}

criteriaModel struct {
match.Criteria
Position int `db:"position" json:"position"`
}

workflowTargetAssoc struct {
ID uuid.UUID `db:"id"`
WorkflowID uuid.UUID `db:"workflow_id"`
Expand Down Expand Up @@ -91,20 +98,27 @@ func (store *Store) UpdateWorkflowTx(tx *sqlx.Tx, workflowID uuid.UUID, newLabel
// NOTE: This action is intended to be used as part of an over-arching transaction; user-story
// for updating a workflow should consider all related data too.
func (store *Store) UpdateWorkflowCriteriaTx(tx *sqlx.Tx, workflowID uuid.UUID, criteria []match.Criteria) error {
type orderedCriteria struct {
match.Criteria
Position int `db:"position"`
}

criteriaIDs := make([]uuid.UUID, len(criteria))
toInsert := make([]orderedCriteria, 0, len(criteria))
for i, v := range criteria {
criteriaIDs[i] = v.ID
toInsert = append(toInsert, orderedCriteria{v, i})
}

// Insert workflow criteria, updating existing criteria
if len(criteria) > 0 {
if _, err := tx.NamedExec(`
INSERT INTO workflow_criteria(id, created_at, updated_at, match_key, match_type, match_combine_type, match_value, workflow_id)
VALUES(:id, current_timestamp, current_timestamp, :match_key, :match_type, :match_combine_type, :match_value, '`+workflowID.String()+`')
INSERT INTO workflow_criteria(id, created_at, updated_at, match_key, match_type, match_combine_type, match_value, workflow_id, position)
VALUES(:id, current_timestamp, current_timestamp, :match_key, :match_type, :match_combine_type, :match_value, '`+workflowID.String()+`', :position)
ON CONFLICT(id) DO UPDATE
SET (updated_at, match_key, match_type, match_combine_type, match_value) =
(current_timestamp, EXCLUDED.match_key, EXCLUDED.match_type, EXCLUDED.match_combine_type, EXCLUDED.match_value)
`, criteria); err != nil {
SET (updated_at, match_key, match_type, match_combine_type, match_value, position) =
(current_timestamp, EXCLUDED.match_key, EXCLUDED.match_type, EXCLUDED.match_combine_type, EXCLUDED.match_value, EXCLUDED.position)
`, toInsert); err != nil {
return err
}

Expand Down Expand Up @@ -161,7 +175,7 @@ func (store *Store) Get(db database.Queryable, id uuid.UUID) *Workflow {
return nil
}

return &Workflow{dest.ID, dest.Enabled, dest.Label, *dest.Criteria.Get(), *dest.Targets.Get()}
return &Workflow{dest.ID, dest.Enabled, dest.Label, processCriteriaModels(*dest.Criteria.Get()), *dest.Targets.Get()}
}

// GetAll queries the database for all workflows, and all the related information.
Expand All @@ -177,7 +191,7 @@ func (store *Store) GetAll(db database.Queryable) []*Workflow {

output := make([]*Workflow, len(dest))
for i, v := range dest {
output[i] = &Workflow{v.ID, v.Enabled, v.Label, *v.Criteria.Get(), *v.Targets.Get()}
output[i] = &Workflow{v.ID, v.Enabled, v.Label, processCriteriaModels(*v.Criteria.Get()), *v.Targets.Get()}
}
return output
}
Expand Down Expand Up @@ -218,3 +232,20 @@ func buildWorkflowTargetAssocs(workflowID uuid.UUID, targetIDs []uuid.UUID) []wo

return assocs
}

func processCriteriaModels(models []criteriaModel) []match.Criteria {
slices.SortFunc(models, func(a, b criteriaModel) int { return cmp.Compare(a.Position, b.Position) })

out := make([]match.Criteria, len(models))
for i, v := range models {
out[i] = match.Criteria{
ID: v.ID,
Key: v.Key,
Type: v.Type,
Value: v.Value,
CombineType: v.CombineType,
}
}

return out
}
2 changes: 1 addition & 1 deletion internal/workflow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (workflow *Workflow) IsMediaEligible(media *media.Container) bool {

fmt.Fprintf(debugStr, "%v", isMatch)
if condition.CombineType == match.OR {
if currentEval {
if currentEval && isMatch {
// End of this block, if the current block
// is satisfied, then we're done, no need to
// test the following conditions
Expand Down
19 changes: 19 additions & 0 deletions tests/helpers/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ func MatchIngestUpdate(path string, state ingest.IngestItemState) chanassert.Mat
})
}

func MatchMovieEvent(path string) chanassert.Matcher[websocket.SocketMessage] {
return chanassert.MatchPredicate(func(message websocket.SocketMessage) bool {
if message.Title != "MEDIA_UPDATE" {
return false
}

movie, ok := message.Body["media"].(map[string]any)["Movie"].(map[string]any)
if !ok {
return false
}

return movie["SourcePath"] == path
})
}

func MatchMessageTitle(title string) chanassert.Matcher[websocket.SocketMessage] {
return chanassert.MatchStructPartial(websocket.SocketMessage{Title: title})
}

// type wrapMatcher[T any] struct {
// matcher chanassert.Matcher[T]
// latch bool
Expand Down
Loading

0 comments on commit b8eb8ef

Please sign in to comment.