Skip to content

Commit

Permalink
test: add ingest integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hbomb79 committed May 15, 2024
1 parent 6b7c09d commit af48a48
Show file tree
Hide file tree
Showing 10 changed files with 178 additions and 75 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ require (

require (
github.com/golang-jwt/jwt/v5 v5.0.0
github.com/hbomb79/go-chanassert v0.1.0
github.com/hbomb79/go-chanassert v0.2.0
)

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hbomb79/go-chanassert v0.1.0 h1:lOiXhgMMJplxYMmpxVvU28felwqNoyMQOWI9YT7WFPo=
github.com/hbomb79/go-chanassert v0.1.0/go.mod h1:r47M3hPoBQDFtVaC/H/fD7yBysVtg/ksRdgrQnWZ44E=
github.com/hbomb79/go-chanassert v0.2.0 h1:svFbIXEYDeSisAsfAYEqyvRR2H4G/nJeIHyxGbqRgZw=
github.com/hbomb79/go-chanassert v0.2.0/go.mod h1:r47M3hPoBQDFtVaC/H/fD7yBysVtg/ksRdgrQnWZ44E=
github.com/hbomb79/transcoder v1.1.2-0.20230606091455-01e3816a02ae h1:0Xd1ZkyLvGXgWpLGJ6AgIsiLnFnxWiBnDXZrZYP/Rwc=
github.com/hbomb79/transcoder v1.1.2-0.20230606091455-01e3816a02ae/go.mod h1:lT+f8NEGaHP6AVeYLicas0EI0+TforD+DRuLziJg6bY=
github.com/ilyakaznacheev/cleanenv v1.4.2 h1:nRqiriLMAC7tz7GzjzUTBHfzdzw6SQ7XvTagkFqe/zU=
Expand Down
12 changes: 11 additions & 1 deletion internal/database/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,16 @@ func New() *manager {
return &manager{}
}

// Connect will attempt to use the config provided to connect to
// a running database. The [SQLDialect] dictates which type of
// database we expect to reach.
//
// This method will try a number of times to Ping the database, before
// giving up with an error if it's unable to successfully establish a connection.
//
// On a successful connection, the relevant internal state is modified to contain
// instances to the newly-connected database, *and* any outstanding migrations
// are run using [executeMigrations].
func (db *manager) Connect(config DatabaseConfig) error {
dsn := fmt.Sprintf(SQLConnectionString, config.Host, config.User, config.Password, config.Name, config.Port)
sql, err := sql.Open(SQLDialect, dsn)
Expand Down Expand Up @@ -145,7 +155,7 @@ func (db *manager) executeMigrations() error {
return nil
}

// GetInstances returns the Goqu database connection if
// GetSqlxDB returns the Goqu database connection if
// one has been opened using 'Connect'. Otherwise, nil is returned.
func (db *manager) GetSqlxDB() *sqlx.DB {
return db.db
Expand Down
36 changes: 17 additions & 19 deletions internal/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,23 @@ var (
ErrWorkflowTargetIDMissing = errors.New("one or more of the targets provided cannot be found")
)

type (
// storeOrchestrator is responsible for managing all of Thea's resources,
// especially highly-relational data. You can think of all
// the data stores below this layer being 'dumb', and this store
// linking them together and providing the database instance
//
// If consumers need to be able to access data stores directly, they're
// welcome to do so - however caution should be taken as stores have no
// obligation to take care of relational data (which is the orchestrator's job).
storeOrchestrator struct {
db database.Manager
ev event.EventDispatcher
mediaStore *media.Store
transcodeStore *transcode.Store
workflowStore *workflow.Store
targetStore *ffmpeg.Store
userStore *user.Store
}
)
// storeOrchestrator is responsible for managing all of Thea's resources,
// especially highly-relational data. You can think of all
// the data stores below this layer being 'dumb', and this store
// linking them together and providing the database instance
//
// If consumers need to be able to access data stores directly, they're
// welcome to do so - however caution should be taken as stores have no
// obligation to take care of relational data (which is the orchestrator's job).
type storeOrchestrator struct {
db database.Manager
ev event.EventDispatcher
mediaStore *media.Store
transcodeStore *transcode.Store
workflowStore *workflow.Store
targetStore *ffmpeg.Store
userStore *user.Store
}

func newStoreOrchestrator(db database.Manager, eventBus event.EventDispatcher) (*storeOrchestrator, error) {
if db.GetSqlxDB() == nil {
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ func TempDirWithFiles(t *testing.T, files []string) (string, []string) {
dirPath := t.TempDir()
filePaths := make([]string, 0, len(files))
for _, filename := range files {
fileName, err := os.CreateTemp(dirPath, filename)
fileName, err := os.CreateTemp(dirPath, "*"+filename)
assert.Nil(t, err, "failed to create temporary file in temporary dir")
filePaths = append(filePaths, fileName.Name())
}
Expand Down
55 changes: 55 additions & 0 deletions tests/helpers/matchers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package helpers

import (
"github.com/hbomb79/Thea/internal/api/controllers/ingests"
"github.com/hbomb79/Thea/internal/http/websocket"
"github.com/hbomb79/Thea/internal/ingest"
"github.com/hbomb79/go-chanassert"
)

// MatchSocketMessage returns a matcher which will match messages which have
// the title and message type provided.
func MatchSocketMessage(title string, typ websocket.SocketMessageType) chanassert.Matcher[websocket.SocketMessage] {
return chanassert.MatchStructPartial(websocket.SocketMessage{Title: title, Type: typ})
}

// MatchIngestUpdate returns a chanassert matcher which will
// match any websocket messages regarding ingestion updates
// which contain the given ingest.
func MatchIngestUpdate(path string, state ingest.IngestItemState) chanassert.Matcher[websocket.SocketMessage] {
return chanassert.MatchPredicate(func(message websocket.SocketMessage) bool {
if message.Title != "INGEST_UPDATE" {
return false
}

updatedIngest, ok := message.Body["ingest"].(map[string]any)
if !ok {
return false
}

return updatedIngest["path"] == path && updatedIngest["state"] == string(ingests.IngestStateModelToDto(state))
})
}

// type wrapMatcher[T any] struct {
// matcher chanassert.Matcher[T]
// latch bool
// cb func(T)
// }

// func (wrapper *wrapMatcher[T]) DoesMatch(t T) bool {
// res := wrapper.matcher.DoesMatch(t)
// if res && !wrapper.latch {
// wrapper.latch = true
// wrapper.cb(t)
// }

// return res
// }

// // matchNotifyWrapper accepts a matcher and will wrap it
// // such that the given callback function is called when
// // the provided matcher successfully matches a message.
// func matchNotifyWrapper[T any](matcher chanassert.Matcher[T], cb func(t T)) chanassert.Matcher[T] {
// return &wrapMatcher[T]{matcher: matcher, cb: cb}
// }
50 changes: 50 additions & 0 deletions tests/helpers/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"time"

"github.com/gorilla/websocket"
theaws "github.com/hbomb79/Thea/internal/http/websocket"
"github.com/hbomb79/Thea/internal/user/permissions"
"github.com/hbomb79/Thea/tests/gen"
"github.com/hbomb79/go-chanassert"
"github.com/labstack/gommon/random"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -86,6 +88,51 @@ func (service *TestService) ConnectToActivitySocket(t *testing.T) *websocket.Con
return ws
}

// ActivityExpecter returns a chanassert expecter which will be setup
// with a single layer which expects the [theaws.Welcome] message.
//
// Tests can add additional layers to the expecter before calling .Listen on it
// to start the expecter.
func (service *TestService) ActivityExpecter(t *testing.T) chanassert.Expecter[theaws.SocketMessage] {
activityChan := service.ActivityChannel(t)

return chanassert.
NewChannelExpecter(activityChan).
Debug().
ExpectTimeout(
time.Millisecond*500,
chanassert.OneOf(MatchSocketMessage("CONNECTION_ESTABLISHED", theaws.Welcome)),
)
}

func (service *TestService) ActivityChannel(t *testing.T) chan theaws.SocketMessage {
ws := service.ConnectToActivitySocket(t)
assert.NotNil(t, ws, "expected websocket connection to be non-nil")

output := make(chan theaws.SocketMessage, 5)
go func() {
for {
var message theaws.SocketMessage
err := ws.ReadJSON(&message)
if err == nil {
t.Logf("WS: received message: %+v\n", message)
output <- message
continue
}

if websocket.IsCloseError(err, websocket.CloseAbnormalClosure) {
t.Logf("WS: warning: websocket connection abnormally closed (error %s). This is expected if the Thea instance was abruptly closed.", err)
} else {
t.Errorf("WS: unexpected error: %+v (%T)", err, err)
}

return
}
}()

return output
}

func (service *TestService) String() string {
return fmt.Sprintf("TestService{port=%d database=%s}", service.Port, service.DatabaseName)
}
Expand Down Expand Up @@ -144,10 +191,13 @@ func (service *TestService) NewClientWithRandomUser(t *testing.T) (TestUser, *ge
}

// waitForHealthy will ping the service (every pollFrequency) until the timeout is reached.
//
// If no successful request has been made when the timeout is reached, then the most
// recent error is returned to the caller, indicating that the service failed to become
// healthy (i.e. the service is not accepting HTTP connections).
func (service *TestService) waitForHealthy(t *testing.T, pollFrequency time.Duration, timeout time.Duration) error {
t.Logf("Waiting for Thea process to become healthy (poll every %s, timeout %s)...", pollFrequency, timeout)

client := service.NewClient(t)
attempts := timeout.Milliseconds() / pollFrequency.Milliseconds()
for attempt := range attempts {
Expand Down
3 changes: 1 addition & 2 deletions tests/helpers/service_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func (pool *TestServicePool) getOrCreate(t *testing.T, request TheaServiceReques
t.Logf("Request for Thea service '%s' has NO matching existing service. Spawning...", request)
pool.databaseManager.provisionDB(t, request.databaseName)
return spawnTheaProc(t, request)
// return spawnThea(t, request)
}

// TheaServiceRequest encapsulates information required to
Expand Down Expand Up @@ -130,7 +129,7 @@ func (req TheaServiceRequest) Key() string {
}

func (req TheaServiceRequest) String() string {
return fmt.Sprintf("Request{db=%s ingestDir=%s}", req.databaseName, req.ingestDirectory)
return fmt.Sprintf("ProvisioningRequest{db=%s ingestDir=%s}", req.databaseName, req.ingestDirectory)
}

func (req TheaServiceRequest) WithDatabaseName(databaseName string) TheaServiceRequest {
Expand Down
10 changes: 5 additions & 5 deletions tests/helpers/spawn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ func spawnTheaProc(t *testing.T, req TheaServiceRequest) *TestService {
theaCmd.Env = append(theaCmd.Env, keyValueToEnv("DB_NAME", databaseName))

// If no ingest directory was specified, then specify one automatically
if _, ok := req.environmentVariables["INGEST_DIR"]; !ok {
theaCmd.Env = append(theaCmd.Env, keyValueToEnv("INGEST_DIR", t.TempDir()))
if req.ingestDirectory == "" {
req.ingestDirectory = t.TempDir()
}
theaCmd.Env = append(theaCmd.Env, keyValueToEnv("INGEST_DIR", req.ingestDirectory))

stdout, err := theaCmd.StdoutPipe()
if err != nil {
Expand All @@ -63,13 +64,13 @@ func spawnTheaProc(t *testing.T, req TheaServiceRequest) *TestService {
}
stderr, err := theaCmd.StderrPipe()
if err != nil {
t.Fatalf("failed to provision Thea instance: could not establish stdout pipe: %s", err)
t.Fatalf("failed to provision Thea instance: could not establish stderr pipe: %s", err)
return nil
}

err = theaCmd.Start()
if err != nil {
t.Fatalf("failed to provision Thea instance: could not run cmd: %s\n** Hint: have you run `make build`?", err)
t.Fatalf("failed to provision Thea instance: could not start process: %s", err)
return nil
}

Expand All @@ -94,7 +95,6 @@ func spawnTheaProc(t *testing.T, req TheaServiceRequest) *TestService {
fmt.Printf("Thea process (%d) for (%s) has closed it's output pipes\n", theaCmd.Process.Pid, req)
}()

t.Logf("Waiting for Thea process to become healthy (5s timeout)...")
srv := &TestService{Port: port, DatabaseName: databaseName, cleanup: cleanup}
if err := srv.waitForHealthy(t, 100*time.Millisecond, 5*time.Second); err != nil {
defer cleanup(t)
Expand Down
79 changes: 35 additions & 44 deletions tests/integration/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,54 +4,45 @@ import (
"testing"
"time"

"github.com/hbomb79/Thea/internal/http/websocket"
"github.com/hbomb79/Thea/internal/ingest"
"github.com/hbomb79/Thea/tests/gen"
"github.com/hbomb79/Thea/tests/helpers"
"github.com/hbomb79/go-chanassert"
"github.com/stretchr/testify/assert"
)

// TestIngestion_FailsToMetadataScrape ensures that files
// TestIngestion_MetadataFailure ensures that files
// which are not valid media files correctly reports failure. Retrying
// these ingestions should see the same error return.
func TestIngestion_FailsToMetadataScrape(t *testing.T) {
tempDir, _ := helpers.TempDirWithFiles(t, []string{"thisisnotavalidfile.mp4"})
srvReq := helpers.NewTheaServiceRequest().WithDatabaseName(t.Name()).WithIngestDirectory(tempDir)
srv := helpers.RequireThea(t, srvReq)

stream := ActivityStream(t, srv)
expecter := chanassert.NewChannelExpecter(stream).
Expect(chanassert.AllOf(chanassert.MatchStructPartial(
websocket.SocketMessage{Title: "CONNECTION_ESTABLISHED", Type: websocket.Welcome},
))).
Expect(chanassert.AllOf(chanassert.MatchStructPartial(
websocket.SocketMessage{Title: "INGEST_UPDATE", Body: map[string]interface{}{}, Type: websocket.Update},
)))
expecter.Listen()
defer expecter.AssertSatisfied(t, time.Second)
}

// ActivityStream returns a channel which will deliver
// messages received over Thea's activity stream socket. This socket
// connection will automatically close when the test finishes/cleans up.
func ActivityStream(t *testing.T, srv *helpers.TestService) chan websocket.SocketMessage {
socket := srv.ConnectToActivitySocket(t)
if err := socket.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
t.Fatalf("failed to set read deadline for activity socket connection: %s", err)
}

t.Cleanup(func() { socket.Close() })

output := make(chan websocket.SocketMessage, 10)
go func(deliveryChan chan websocket.SocketMessage) {
for {
var dest websocket.SocketMessage
err := socket.ReadJSON(&dest)
if err != nil {
t.Logf("WARNING: activity stream read JSON error: %s", err)
}

deliveryChan <- dest
}
}(output)

return output
func TestIngestion_MetadataFailure(t *testing.T) {
tempDir, paths := helpers.TempDirWithFiles(t, []string{"thisisnotavalidfile.mp4"})
req := helpers.NewTheaServiceRequest().
WithDatabaseName(t.Name()).
WithIngestDirectory(tempDir).
WithEnvironmentVariable("INGEST_MODTIME_THRESHOLD_SECONDS", "0")
srv := helpers.RequireThea(t, req)

exp := srv.ActivityExpecter(t).Expect(
chanassert.ExactlyNOf(2, helpers.MatchIngestUpdate(paths[0], ingest.Troubled)),
)
exp.Listen()

client := srv.NewClientWithDefaultAdminUser(t)

ingestsResponse, err := client.ListIngestsWithResponse(ctx)
assert.NoError(t, err)
assert.NotNil(t, ingestsResponse.JSON200, "expected JSON response to be non-nil")
assert.Len(t, *ingestsResponse.JSON200, 1, "expected ingests to have length 1")

ingestItem := (*ingestsResponse.JSON200)[0]
assert.Equal(t, gen.IngestStateTROUBLED, ingestItem.State)
assert.Contains(t, ingestItem.Trouble.AllowedResolutionTypes, gen.RETRY)

// Retry this item and observe that it will persistently fail. Sleep for the 'debounce' time
// of the WS messages to ensure we receive both
time.Sleep(time.Second * 2)

_, err = client.ResolveIngestWithResponse(ctx, ingestItem.Id, gen.ResolveIngestJSONRequestBody{Method: gen.RETRY, Context: map[string]string{}})
assert.NoError(t, err)
exp.AssertSatisfied(t, time.Second*6)
}

0 comments on commit af48a48

Please sign in to comment.