From 6aa583db7699b551048d93428c10b7d2d3a9ce96 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Sun, 3 Jan 2016 08:25:31 +0700 Subject: [PATCH] Remove store type. --- CHANGELOG.md | 2 + apps.go | 179 ++++++++++++++----------------- certs.go | 11 +- cmd/empire/factories.go | 5 +- cmd/empire/migrate.go | 10 +- configs.go | 37 +++---- db.go | 155 ++++++++++++++++++++++++++- store_test.go => db_test.go | 0 deployments.go | 24 +++-- domains.go | 96 ++++++++--------- empire.go | 141 +++++++++++++++++------- empiretest/test.go | 9 +- ports.go | 29 ++--- processes.go | 64 ++--------- processes_test.go | 11 -- releases.go | 206 ++++++++++++++++++------------------ runner.go | 2 +- server/heroku/domains.go | 6 +- slugs.go | 15 +-- store.go | 142 ------------------------- tests/empire/empire_test.go | 91 +++++++++++++++- 21 files changed, 650 insertions(+), 585 deletions(-) rename store_test.go => db_test.go (100%) delete mode 100644 store.go diff --git a/CHANGELOG.md b/CHANGELOG.md index eefd38bd9..3c66b3085 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ * Allow floating point numbers to be provided when scaling the memory on a process [#694](https://github.com/remind101/empire/pull/694). * Empire will now update the SSL certificate on the associated ELB if it changes from `emp cert-attach` [#700](https://github.com/remind101/empire/pull/700). * The Tugboat integration now updates the deployment status with any errors that occurred [#709](https://github.com/remind101/empire/pull/709). +* Deploying a non-existent docker image to Empire will no longer create an app [#713](https://github.com/remind101/empire/pull/713). +* It's no longer necessary to re-deploy an application when scaling a process with new CPU or memory constraints [#713](https://github.com/remind101/empire/pull/713). **Security** diff --git a/apps.go b/apps.go index c3fd385d0..5fed8df65 100644 --- a/apps.go +++ b/apps.go @@ -104,35 +104,6 @@ func (q AppsQuery) Scope(db *gorm.DB) *gorm.DB { return scope.Scope(db) } -// AppsFind returns the first matching release. -func (s *store) AppsFind(scope Scope) (*App, error) { - var app App - return &app, s.First(scope, &app) -} - -// Apps returns all apps matching the scope. -func (s *store) Apps(scope Scope) ([]*App, error) { - var apps []*App - // Default to ordering by name. - scope = ComposedScope{Order("name"), scope} - return apps, s.Find(scope, &apps) -} - -// AppsCreate persists an app. -func (s *store) AppsCreate(app *App) (*App, error) { - return appsCreate(s.db, app) -} - -// AppsUpdate updates an app. -func (s *store) AppsUpdate(app *App) error { - return appsUpdate(s.db, app) -} - -// AppsDestroy destroys an app. -func (s *store) AppsDestroy(app *App) error { - return appsDestroy(s.db, app) -} - // AppID returns a scope to find an app by id. func AppID(id string) func(*gorm.DB) *gorm.DB { return func(db *gorm.DB) *gorm.DB { @@ -144,71 +115,27 @@ type appsService struct { *Empire } -func (s *appsService) AppsDestroy(ctx context.Context, app *App) error { - if err := s.Scheduler.Remove(ctx, app.ID); err != nil { +// Destroy destroys removes an app from the scheduler, then destroys it here. +func (s *appsService) Destroy(ctx context.Context, db *gorm.DB, app *App) error { + if err := appsDestroy(db, app); err != nil { return err } - return s.store.AppsDestroy(app) + return s.Scheduler.Remove(ctx, app.ID) } -// AppsEnsureRepo will set the repo if it's not set. -func (s *appsService) AppsEnsureRepo(app *App, repo string) error { - if app.Repo != nil { - return nil - } - - app.Repo = &repo - - return s.store.AppsUpdate(app) -} - -// AppsFindOrCreateByRepo first attempts to find an app by repo, falling back to -// creating a new app. -func (s *appsService) AppsFindOrCreateByRepo(repo string) (*App, error) { - n := AppNameFromRepo(repo) - a, err := s.store.AppsFind(AppsQuery{Name: &n}) - if err != nil && err != gorm.RecordNotFound { - return a, err - } - - // If the app wasn't found, create a new app. - if err != gorm.RecordNotFound { - return a, s.AppsEnsureRepo(a, repo) - } - - a = &App{ - Name: n, - Repo: &repo, +func (s *appsService) Restart(ctx context.Context, db *gorm.DB, opts RestartOpts) error { + if opts.PID != "" { + return s.Scheduler.Stop(ctx, opts.PID) } - return s.store.AppsCreate(a) -} - -// AppsCreate inserts the app into the database. -func appsCreate(db *gorm.DB, app *App) (*App, error) { - return app, db.Create(app).Error -} - -// AppsUpdate updates an app. -func appsUpdate(db *gorm.DB, app *App) error { - return db.Save(app).Error -} - -// AppsDestroy destroys an app. -func appsDestroy(db *gorm.DB, app *App) error { - return db.Delete(app).Error -} - -// scaler is a small service for scaling an apps process. -type scaler struct { - *Empire + return s.releases.ReleaseApp(ctx, db, opts.App) } -func (s *scaler) Scale(ctx context.Context, opts ScaleOpts) (*Process, error) { +func (s *appsService) Scale(ctx context.Context, db *gorm.DB, opts ScaleOpts) (*Process, error) { app, t, quantity, c := opts.App, opts.Process, opts.Quantity, opts.Constraints - release, err := s.store.ReleasesFind(ReleasesQuery{App: app}) + release, err := releasesFind(db, ReleasesQuery{App: app}) if err != nil { return nil, err } @@ -217,13 +144,8 @@ func (s *scaler) Scale(ctx context.Context, opts ScaleOpts) (*Process, error) { return nil, &ValidationError{Err: fmt.Errorf("no releases for %s", app.Name)} } - f, err := s.store.Formation(ProcessesQuery{Release: release}) - if err != nil { - return nil, err - } - - p, ok := f[t] - if !ok { + p := release.Process(t) + if p == nil { return nil, &ValidationError{Err: fmt.Errorf("no %s process type in release", t)} } @@ -240,22 +162,83 @@ func (s *scaler) Scale(ctx context.Context, opts ScaleOpts) (*Process, error) { p.Constraints = *c } - if err := s.store.ProcessesUpdate(p); err != nil { + if err := processesUpdate(db, p); err != nil { return nil, err } + // If there are no changes to the process size, we can do a quick scale + // up, otherwise, we will resubmit the release to the scheduler. + if c == nil { + err = s.Scheduler.Scale(ctx, release.AppID, string(p.Type), uint(quantity)) + } else { + err = s.releases.Release(ctx, release) + } + + if err != nil { + return p, err + } + return p, s.PublishEvent(event) } -// restarter is a small service for restarting an apps processes. -type restarter struct { - *Empire +// appsEnsureRepo will set the repo if it's not set. +func appsEnsureRepo(db *gorm.DB, app *App, repo string) error { + if app.Repo != nil { + return nil + } + + app.Repo = &repo + + return appsUpdate(db, app) } -func (s *restarter) Restart(ctx context.Context, opts RestartOpts) error { - if opts.PID != "" { - return s.Scheduler.Stop(ctx, opts.PID) +// appsFindOrCreateByRepo first attempts to find an app by repo, falling back to +// creating a new app. +func appsFindOrCreateByRepo(db *gorm.DB, repo string) (*App, error) { + n := AppNameFromRepo(repo) + a, err := appsFind(db, AppsQuery{Name: &n}) + if err != nil && err != gorm.RecordNotFound { + return a, err + } + + // If the app wasn't found, create a new app. + if err != gorm.RecordNotFound { + return a, appsEnsureRepo(db, a, repo) } - return s.releaser.ReleaseApp(ctx, opts.App) + a = &App{ + Name: n, + Repo: &repo, + } + + return appsCreate(db, a) +} + +// appsFind finds a single app given the scope. +func appsFind(db *gorm.DB, scope Scope) (*App, error) { + var app App + return &app, first(db, scope, &app) +} + +// apps finds all apps matching the scope. +func apps(db *gorm.DB, scope Scope) ([]*App, error) { + var apps []*App + // Default to ordering by name. + scope = ComposedScope{Order("name"), scope} + return apps, find(db, scope, &apps) +} + +// appsCreate inserts the app into the database. +func appsCreate(db *gorm.DB, app *App) (*App, error) { + return app, db.Create(app).Error +} + +// appsUpdate updates an app. +func appsUpdate(db *gorm.DB, app *App) error { + return db.Save(app).Error +} + +// appsDestroy destroys an app. +func appsDestroy(db *gorm.DB, app *App) error { + return db.Delete(app).Error } diff --git a/certs.go b/certs.go index 663d8038a..d980d0268 100644 --- a/certs.go +++ b/certs.go @@ -1,19 +1,22 @@ package empire -import "golang.org/x/net/context" +import ( + "github.com/jinzhu/gorm" + "golang.org/x/net/context" +) type certsService struct { *Empire } -func (s *certsService) CertsAttach(ctx context.Context, app *App, cert string) error { +func (s *certsService) CertsAttach(ctx context.Context, db *gorm.DB, app *App, cert string) error { app.Cert = cert - if err := s.store.AppsUpdate(app); err != nil { + if err := appsUpdate(db, app); err != nil { return err } - if err := s.releaser.ReleaseApp(ctx, app); err != nil { + if err := s.releases.ReleaseApp(ctx, db, app); err != nil { if err == ErrNoReleases { return nil } diff --git a/cmd/empire/factories.go b/cmd/empire/factories.go index 07ebd03d2..a7f5f2e29 100644 --- a/cmd/empire/factories.go +++ b/cmd/empire/factories.go @@ -11,7 +11,6 @@ import ( "github.com/aws/aws-sdk-go/aws/session" "github.com/codegangsta/cli" "github.com/inconshreveable/log15" - "github.com/jinzhu/gorm" "github.com/remind101/empire" "github.com/remind101/empire/events/sns" "github.com/remind101/empire/pkg/dockerutil" @@ -24,8 +23,8 @@ import ( // DB =================================== -func newDB(c *cli.Context) (*gorm.DB, error) { - return empire.NewDB(c.String(FlagDB)) +func newDB(c *cli.Context) (*empire.DB, error) { + return empire.OpenDB(c.String(FlagDB)) } // Empire =============================== diff --git a/cmd/empire/migrate.go b/cmd/empire/migrate.go index 7ee50e0a2..f054cde5b 100644 --- a/cmd/empire/migrate.go +++ b/cmd/empire/migrate.go @@ -5,14 +5,16 @@ import ( "log" "github.com/codegangsta/cli" - "github.com/remind101/empire" ) func runMigrate(c *cli.Context) { - path := c.String(FlagDBPath) - db := c.String(FlagDB) + db, err := newDB(c) + if err != nil { + log.Fatal(err) + } - errors, ok := empire.Migrate(db, path) + path := c.String(FlagDBPath) + errors, ok := db.MigrateUp(path) if !ok { log.Fatal(errors) } diff --git a/configs.go b/configs.go index 228879e32..6344782ea 100644 --- a/configs.go +++ b/configs.go @@ -102,16 +102,11 @@ func (q ConfigsQuery) Scope(db *gorm.DB) *gorm.DB { return scope.Scope(db) } -// ConfigsFirst returns the first matching config. -func (s *store) ConfigsFirst(scope Scope) (*Config, error) { +// configsFind returns the first matching config. +func configsFind(db *gorm.DB, scope Scope) (*Config, error) { var config Config scope = ComposedScope{Order("created_at desc"), scope} - return &config, s.First(scope, &config) -} - -// ConfigsCreate persists the Config. -func (s *store) ConfigsCreate(config *Config) (*Config, error) { - return configsCreate(s.db, config) + return &config, first(db, scope, &config) } // ConfigsCreate inserts a Config in the database. @@ -123,20 +118,20 @@ type configsService struct { *Empire } -func (s *configsService) Set(ctx context.Context, opts SetOpts) (*Config, error) { +func (s *configsService) Set(ctx context.Context, db *gorm.DB, opts SetOpts) (*Config, error) { app, vars := opts.App, opts.Vars - old, err := s.Config(app) + old, err := s.Config(db, app) if err != nil { return nil, err } - c, err := s.store.ConfigsCreate(NewConfig(old, vars)) + c, err := configsCreate(db, NewConfig(old, vars)) if err != nil { return c, err } - release, err := s.store.ReleasesFind(ReleasesQuery{App: app}) + release, err := releasesFind(db, ReleasesQuery{App: app}) if err != nil { if err == gorm.RecordNotFound { err = nil @@ -146,7 +141,7 @@ func (s *configsService) Set(ctx context.Context, opts SetOpts) (*Config, error) } // Create new release based on new config and old slug - _, err = s.releases.ReleasesCreate(ctx, &Release{ + _, err = s.releases.Create(ctx, db, &Release{ App: release.App, Config: c, Slug: release.Slug, @@ -156,18 +151,20 @@ func (s *configsService) Set(ctx context.Context, opts SetOpts) (*Config, error) } // Returns configs for latest release or the latest configs if there are no releases. -func (s *configsService) Config(app *App) (*Config, error) { - r, err := s.store.ReleasesFind(ReleasesQuery{App: app}) +func (s *configsService) Config(db *gorm.DB, app *App) (*Config, error) { + r, err := releasesFind(db, ReleasesQuery{App: app}) if err != nil { if err == gorm.RecordNotFound { // It's possible to have config without releases, this handles that. - c, err := s.store.ConfigsFirst(ConfigsQuery{App: app}) + c, err := configsFind(db, ConfigsQuery{App: app}) if err != nil { if err == gorm.RecordNotFound { - return s.store.ConfigsCreate(&Config{ - App: app, - Vars: make(Vars), - }) + // Return an empty config. + return &Config{ + AppID: app.ID, + App: app, + Vars: make(Vars), + }, nil } return nil, err } diff --git a/db.go b/db.go index 72e464c29..75bc9d2ff 100644 --- a/db.go +++ b/db.go @@ -2,13 +2,23 @@ package empire import ( "database/sql" + "fmt" "net/url" "github.com/jinzhu/gorm" + "github.com/mattes/migrate/migrate" + "github.com/remind101/empire/pkg/headerutil" ) -// NewDB returns a new gorm.DB instance. -func NewDB(uri string) (*gorm.DB, error) { +// DB wraps a gorm.DB and provides the datastore layer for Empire. +type DB struct { + *gorm.DB + + uri string +} + +// OpenDB returns a new gorm.DB instance. +func OpenDB(uri string) (*DB, error) { u, err := url.Parse(uri) if err != nil { return nil, err @@ -24,5 +34,144 @@ func NewDB(uri string) (*gorm.DB, error) { return nil, err } - return &db, nil + return &DB{ + DB: &db, + uri: uri, + }, nil +} + +// MigrateUp migrates the database to the latest version of the schema. +func (db *DB) MigrateUp(path string) ([]error, bool) { + return migrate.UpSync(db.uri, path) +} + +// Reset resets the database to a pristine state. +func (db *DB) Reset() error { + var err error + exec := func(sql string) { + if err == nil { + err = db.Exec(sql).Error + } + } + + exec(`TRUNCATE TABLE apps CASCADE`) + exec(`TRUNCATE TABLE ports CASCADE`) + exec(`INSERT INTO ports (port) (SELECT generate_series(9000,10000))`) + + return err +} + +// IsHealthy checks that we can connect to the database. +func (db *DB) IsHealthy() bool { + return db.DB.DB().Ping() == nil +} + +// Debug puts the db in debug mode, which logs all queries. +func (db *DB) Debug() { + db.DB = db.DB.Debug() +} + +// Scope is an interface that scopes a gorm.DB. Scopes are used in +// ThingsFirst and ThingsAll methods on the store for filtering/querying. +type Scope interface { + Scope(*gorm.DB) *gorm.DB +} + +// ScopeFunc implements the Scope interface for functions. +type ScopeFunc func(*gorm.DB) *gorm.DB + +// Scope implements the Scope interface. +func (f ScopeFunc) Scope(db *gorm.DB) *gorm.DB { + return f(db) +} + +// All returns a scope that simply returns the db. +var All = ScopeFunc(func(db *gorm.DB) *gorm.DB { + return db +}) + +// ID returns a Scope that will find the item by id. +func ID(id string) Scope { + return FieldEquals("id", id) +} + +// ForApp returns a Scope that will filter items belonging the the given app. +func ForApp(app *App) Scope { + return FieldEquals("app_id", app.ID) +} + +// ComposedScope is an implementation of the Scope interface that chains the +// scopes together. +type ComposedScope []Scope + +// Scope implements the Scope interface. +func (s ComposedScope) Scope(db *gorm.DB) *gorm.DB { + for _, s := range s { + db = s.Scope(db) + } + + return db +} + +// FieldEquals returns a Scope that filters on a field. +func FieldEquals(field string, v interface{}) Scope { + return ScopeFunc(func(db *gorm.DB) *gorm.DB { + return db.Where(fmt.Sprintf("%s = ?", field), v) + }) +} + +// Preload returns a Scope that preloads the associations. +func Preload(associations ...string) Scope { + var scope ComposedScope + + for _, a := range associations { + aa := a + scope = append(scope, ScopeFunc(func(db *gorm.DB) *gorm.DB { + return db.Preload(aa) + })) + } + + return scope +} + +// Order returns a Scope that orders the results. +func Order(order string) Scope { + return ScopeFunc(func(db *gorm.DB) *gorm.DB { + return db.Order(order) + }) +} + +// Limit returns a Scope that limits the results. +func Limit(limit int) Scope { + return ScopeFunc(func(db *gorm.DB) *gorm.DB { + return db.Limit(limit) + }) +} + +// Range returns a Scope that limits and orders the results. +func Range(r headerutil.Range) Scope { + var scope ComposedScope + + if r.Max != nil { + scope = append(scope, Limit(*r.Max)) + } + + if r.Sort != nil && r.Order != nil { + order := fmt.Sprintf("%s %s", *r.Sort, *r.Order) + scope = append(scope, Order(order)) + } + + return scope +} + +// first is a small helper that finds the first record matching a scope, and +// returns the error. +func first(db *gorm.DB, scope Scope, v interface{}) error { + return scope.Scope(db).First(v).Error +} + +// find is a small helper that finds records matching the scope, and returns the +// error. +func find(db *gorm.DB, scope Scope, v interface{}) error { + return scope.Scope(db).Find(v).Error } diff --git a/store_test.go b/db_test.go similarity index 100% rename from store_test.go rename to db_test.go diff --git a/deployments.go b/deployments.go index b23ab68e4..3a21376ad 100644 --- a/deployments.go +++ b/deployments.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/docker/docker/pkg/jsonmessage" + "github.com/jinzhu/gorm" "golang.org/x/net/context" ) @@ -14,34 +15,34 @@ type deployerService struct { *Empire } -// doDeploy does the actual deployment -func (s *deployerService) doDeploy(ctx context.Context, opts DeploymentsCreateOpts) (*Release, error) { +// deploy does the actual deployment +func (s *deployerService) deploy(ctx context.Context, db *gorm.DB, opts DeploymentsCreateOpts) (*Release, error) { app, img := opts.App, opts.Image // If no app is specified, attempt to find the app that relates to this // images repository, or create it if not found. if app == nil { var err error - app, err = s.apps.AppsFindOrCreateByRepo(img.Repository) + app, err = appsFindOrCreateByRepo(db, img.Repository) if err != nil { return nil, err } } else { // If the app doesn't already have a repo attached to it, we'll attach // this image's repo. - if err := s.apps.AppsEnsureRepo(app, img.Repository); err != nil { + if err := appsEnsureRepo(db, app, img.Repository); err != nil { return nil, err } } // Grab the latest config. - config, err := s.Config(app) + config, err := s.configs.Config(db, app) if err != nil { return nil, err } // Create a new slug for the docker image. - slug, err := s.slugs.SlugsCreateByImage(ctx, img, opts.Output) + slug, err := s.slugs.Create(ctx, db, img, opts.Output) if err != nil { return nil, err } @@ -50,7 +51,7 @@ func (s *deployerService) doDeploy(ctx context.Context, opts DeploymentsCreateOp // and Slug. desc := fmt.Sprintf("Deploy %s", img.String()) - r, err := s.releases.ReleasesCreate(ctx, &Release{ + r, err := s.releases.Create(ctx, db, &Release{ App: app, Config: config, Slug: slug, @@ -60,11 +61,12 @@ func (s *deployerService) doDeploy(ctx context.Context, opts DeploymentsCreateOp return r, err } -// Deploy is a thin wrapper around doDeploy to handle errors & output more cleanly -func (s *deployerService) Deploy(ctx context.Context, opts DeploymentsCreateOpts) (*Release, error) { +// Deploy is a thin wrapper around deploy to that adds the error to the +// jsonmessage stream. +func (s *deployerService) Deploy(ctx context.Context, db *gorm.DB, opts DeploymentsCreateOpts) (*Release, error) { var msg jsonmessage.JSONMessage - r, err := s.doDeploy(ctx, opts) + r, err := s.deploy(ctx, db, opts) if err != nil { msg = newJSONMessageError(err) } else { @@ -75,5 +77,5 @@ func (s *deployerService) Deploy(ctx context.Context, opts DeploymentsCreateOpts return r, err } - return r, nil + return r, err } diff --git a/domains.go b/domains.go index 94d689f59..f23a77343 100644 --- a/domains.go +++ b/domains.go @@ -3,6 +3,8 @@ package empire import ( "errors" + "golang.org/x/net/context" + "time" "github.com/jinzhu/gorm" @@ -34,8 +36,8 @@ type domainsService struct { *Empire } -func (s *domainsService) DomainsCreate(domain *Domain) (*Domain, error) { - d, err := s.store.DomainsFind(DomainsQuery{Hostname: &domain.Hostname}) +func (s *domainsService) DomainsCreate(ctx context.Context, db *gorm.DB, domain *Domain) (*Domain, error) { + d, err := domainsFind(db, DomainsQuery{Hostname: &domain.Hostname}) if err != nil && err != gorm.RecordNotFound { return domain, err } @@ -48,31 +50,31 @@ func (s *domainsService) DomainsCreate(domain *Domain) (*Domain, error) { } } - _, err = s.store.DomainsCreate(domain) + _, err = domainsCreate(db, domain) if err != nil { return domain, err } - if err := s.makePublic(domain.AppID); err != nil { + if err := makePublic(db, domain.AppID); err != nil { return domain, err } return domain, err } -func (s *domainsService) DomainsDestroy(domain *Domain) error { - if err := s.store.DomainsDestroy(domain); err != nil { +func (s *domainsService) DomainsDestroy(ctx context.Context, db *gorm.DB, domain *Domain) error { + if err := domainsDestroy(db, domain); err != nil { return err } // If app has no domains associated, make it private - d, err := s.store.Domains(DomainsQuery{App: domain.App}) + d, err := domains(db, DomainsQuery{App: domain.App}) if err != nil { return err } if len(d) == 0 { - if err := s.makePrivate(domain.AppID); err != nil { + if err := makePrivate(db, domain.AppID); err != nil { return err } } @@ -80,34 +82,6 @@ func (s *domainsService) DomainsDestroy(domain *Domain) error { return nil } -func (s *domainsService) makePublic(appID string) error { - a, err := s.store.AppsFind(AppsQuery{ID: &appID}) - if err != nil { - return err - } - - a.Exposure = "public" - if err := s.store.AppsUpdate(a); err != nil { - return err - } - - return nil -} - -func (s *domainsService) makePrivate(appID string) error { - a, err := s.store.AppsFind(AppsQuery{ID: &appID}) - if err != nil { - return err - } - - a.Exposure = "private" - if err := s.store.AppsUpdate(a); err != nil { - return err - } - - return nil -} - // DomainsQuery is a Scope implementation for common things to filter releases // by. type DomainsQuery struct { @@ -133,26 +107,16 @@ func (q DomainsQuery) Scope(db *gorm.DB) *gorm.DB { return scope.Scope(db) } -// DomainsFind returns the first matching domain. -func (s *store) DomainsFind(scope Scope) (*Domain, error) { +// domainsFind returns the first matching domain. +func domainsFind(db *gorm.DB, scope Scope) (*Domain, error) { var domain Domain - return &domain, s.First(scope, &domain) + return &domain, first(db, scope, &domain) } -// Domains returns all domains matching the scope. -func (s *store) Domains(scope Scope) ([]*Domain, error) { +// domains returns all domains matching the scope. +func domains(db *gorm.DB, scope Scope) ([]*Domain, error) { var domains []*Domain - return domains, s.Find(scope, &domains) -} - -// DomainsCreate persists the Domain. -func (s *store) DomainsCreate(domain *Domain) (*Domain, error) { - return domainsCreate(s.db, domain) -} - -// DomainsDestroy destroys the Domain. -func (s *store) DomainsDestroy(domain *Domain) error { - return domainsDestroy(s.db, domain) + return domains, find(db, scope, &domains) } func domainsCreate(db *gorm.DB, domain *Domain) (*Domain, error) { @@ -162,3 +126,31 @@ func domainsCreate(db *gorm.DB, domain *Domain) (*Domain, error) { func domainsDestroy(db *gorm.DB, domain *Domain) error { return db.Delete(domain).Error } + +func makePublic(db *gorm.DB, appID string) error { + a, err := appsFind(db, AppsQuery{ID: &appID}) + if err != nil { + return err + } + + a.Exposure = "public" + if err := appsUpdate(db, a); err != nil { + return err + } + + return nil +} + +func makePrivate(db *gorm.DB, appID string) error { + a, err := appsFind(db, AppsQuery{ID: &appID}) + if err != nil { + return err + } + + a.Exposure = "private" + if err := appsUpdate(db, a); err != nil { + return err + } + + return nil +} diff --git a/empire.go b/empire.go index 482b8860c..27fdf123b 100644 --- a/empire.go +++ b/empire.go @@ -9,7 +9,6 @@ import ( "github.com/fsouza/go-dockerclient" "github.com/inconshreveable/log15" "github.com/jinzhu/gorm" - "github.com/mattes/migrate/migrate" "github.com/remind101/empire/pkg/dockerutil" "github.com/remind101/empire/pkg/image" "github.com/remind101/empire/procfile" @@ -52,7 +51,8 @@ type Empire struct { // Logger is a log15 logger that will be used for logging. Logger log15.Logger - store *store + DB *DB + db *gorm.DB accessTokens *accessTokensService apps *appsService @@ -60,10 +60,7 @@ type Empire struct { domains *domainsService tasks *tasksService releases *releasesService - releaser *releaser deployer *deployerService - scaler *scaler - restarter *restarter runner *runnerService slugs *slugsService certs *certsService @@ -83,12 +80,14 @@ type Empire struct { } // New returns a new Empire instance. -func New(db *gorm.DB, options Options) *Empire { +func New(db *DB, options Options) *Empire { e := &Empire{ Logger: nullLogger(), LogsStreamer: logsDisabled, EventStream: NullEventStream, - store: &store{db: db}, + + DB: db, + db: db.DB, } e.accessTokens = &accessTokensService{Secret: []byte(options.Secret)} @@ -98,11 +97,8 @@ func New(db *gorm.DB, options Options) *Empire { e.domains = &domainsService{Empire: e} e.slugs = &slugsService{Empire: e} e.tasks = &tasksService{Empire: e} - e.scaler = &scaler{Empire: e} - e.restarter = &restarter{Empire: e} e.runner = &runnerService{Empire: e} e.releases = &releasesService{Empire: e} - e.releaser = &releaser{Empire: e} e.certs = &certsService{Empire: e} return e } @@ -119,12 +115,12 @@ func (e *Empire) AccessTokensCreate(accessToken *AccessToken) (*AccessToken, err // AppsFind finds the first app matching the query. func (e *Empire) AppsFind(q AppsQuery) (*App, error) { - return e.store.AppsFind(q) + return appsFind(e.db, q) } // Apps returns all Apps. func (e *Empire) Apps(q AppsQuery) ([]*App, error) { - return e.store.Apps(q) + return apps(e.db, q) } // CreateOpts are options that are provided when creating a new application. @@ -145,7 +141,7 @@ func (opts CreateOpts) Event() CreateEvent { // Create creates a new app. func (e *Empire) Create(ctx context.Context, opts CreateOpts) (*App, error) { - a, err := e.store.AppsCreate(&App{Name: opts.Name}) + a, err := appsCreate(e.db, &App{Name: opts.Name}) if err != nil { return a, err } @@ -171,7 +167,14 @@ func (opts DestroyOpts) Event() DestroyEvent { // Destroy destroys an app. func (e *Empire) Destroy(ctx context.Context, opts DestroyOpts) error { - if err := e.apps.AppsDestroy(ctx, opts.App); err != nil { + tx := e.db.Begin() + + if err := e.apps.Destroy(ctx, tx, opts.App); err != nil { + tx.Rollback() + return err + } + + if err := tx.Commit().Error; err != nil { return err } @@ -180,7 +183,19 @@ func (e *Empire) Destroy(ctx context.Context, opts DestroyOpts) error { // Config returns the current Config for a given app. func (e *Empire) Config(app *App) (*Config, error) { - return e.configs.Config(app) + tx := e.db.Begin() + + c, err := e.configs.Config(tx, app) + if err != nil { + tx.Rollback() + return c, err + } + + if err := tx.Commit().Error; err != nil { + return c, err + } + + return c, nil } // SetOpts are options provided when setting new config vars on an app. @@ -212,8 +227,15 @@ func (opts SetOpts) Event() SetEvent { // Config. If the app has a running release, a new release will be created and // run. func (e *Empire) Set(ctx context.Context, opts SetOpts) (*Config, error) { - c, err := e.configs.Set(ctx, opts) + tx := e.db.Begin() + + c, err := e.configs.Set(ctx, tx, opts) if err != nil { + tx.Rollback() + return c, err + } + + if err := tx.Commit().Error; err != nil { return c, err } @@ -222,22 +244,45 @@ func (e *Empire) Set(ctx context.Context, opts SetOpts) (*Config, error) { // DomainsFind returns the first domain matching the query. func (e *Empire) DomainsFind(q DomainsQuery) (*Domain, error) { - return e.store.DomainsFind(q) + return domainsFind(e.db, q) } // Domains returns all domains matching the query. func (e *Empire) Domains(q DomainsQuery) ([]*Domain, error) { - return e.store.Domains(q) + return domains(e.db, q) } // DomainsCreate adds a new Domain for an App. -func (e *Empire) DomainsCreate(domain *Domain) (*Domain, error) { - return e.domains.DomainsCreate(domain) +func (e *Empire) DomainsCreate(ctx context.Context, domain *Domain) (*Domain, error) { + tx := e.db.Begin() + + d, err := e.domains.DomainsCreate(ctx, tx, domain) + if err != nil { + tx.Rollback() + return d, err + } + + if err := tx.Commit().Error; err != nil { + return d, err + } + + return d, nil } // DomainsDestroy removes a Domain for an App. -func (e *Empire) DomainsDestroy(domain *Domain) error { - return e.domains.DomainsDestroy(domain) +func (e *Empire) DomainsDestroy(ctx context.Context, domain *Domain) error { + tx := e.db.Begin() + + if err := e.domains.DomainsDestroy(ctx, tx, domain); err != nil { + tx.Rollback() + return err + } + + if err := tx.Commit().Error; err != nil { + return err + } + + return nil } // Tasks returns the Tasks for the given app. @@ -269,7 +314,7 @@ func (opts RestartOpts) Event() RestartEvent { // Restart restarts processes matching the given prefix for the given Release. // If the prefix is empty, it will match all processes for the release. func (e *Empire) Restart(ctx context.Context, opts RestartOpts) error { - if err := e.restarter.Restart(ctx, opts); err != nil { + if err := e.apps.Restart(ctx, e.db, opts); err != nil { return err } @@ -323,12 +368,12 @@ func (e *Empire) Run(ctx context.Context, opts RunOpts) error { // Releases returns all Releases for a given App. func (e *Empire) Releases(q ReleasesQuery) ([]*Release, error) { - return e.store.Releases(q) + return releases(e.db, q) } // ReleasesFind returns the first releases for a given App. func (e *Empire) ReleasesFind(q ReleasesQuery) (*Release, error) { - return e.store.ReleasesFind(q) + return releasesFind(e.db, q) } // RollbackOpts are options provided when rolling back to an old release. @@ -354,8 +399,15 @@ func (opts RollbackOpts) Event() RollbackEvent { // Rollback rolls an app back to a specific release version. Returns a // new release. func (e *Empire) Rollback(ctx context.Context, opts RollbackOpts) (*Release, error) { - r, err := e.releases.Rollback(ctx, opts) + tx := e.db.Begin() + + r, err := e.releases.Rollback(ctx, tx, opts) if err != nil { + tx.Rollback() + return r, err + } + + if err := tx.Commit().Error; err != nil { return r, err } @@ -392,8 +444,15 @@ func (opts DeploymentsCreateOpts) Event() DeployEvent { // Deploy deploys an image and streams the output to w. func (e *Empire) Deploy(ctx context.Context, opts DeploymentsCreateOpts) (*Release, error) { - r, err := e.deployer.Deploy(ctx, opts) + tx := e.db.Begin() + + r, err := e.deployer.Deploy(ctx, tx, opts) if err != nil { + tx.Rollback() + return r, err + } + + if err := tx.Commit().Error; err != nil { return r, err } @@ -429,7 +488,15 @@ func (opts ScaleOpts) Event() ScaleEvent { // Scale scales an apps process. func (e *Empire) Scale(ctx context.Context, opts ScaleOpts) (*Process, error) { - return e.scaler.Scale(ctx, opts) + tx := e.db.Begin() + + p, err := e.apps.Scale(ctx, tx, opts) + if err != nil { + tx.Rollback() + return p, err + } + + return p, tx.Commit().Error } // Streamlogs streams logs from an app. @@ -439,23 +506,25 @@ func (e *Empire) StreamLogs(app *App, w io.Writer) error { // CertsAttach attaches an SSL certificate to the app. func (e *Empire) CertsAttach(ctx context.Context, app *App, cert string) error { - return e.certs.CertsAttach(ctx, app, cert) + tx := e.db.Begin() + + if err := e.certs.CertsAttach(ctx, tx, app, cert); err != nil { + tx.Rollback() + return err + } + + return tx.Commit().Error } // Reset resets empire. func (e *Empire) Reset() error { - return e.store.Reset() + return e.DB.Reset() } // IsHealthy returns true if Empire is healthy, which means it can connect to // the services it depends on. func (e *Empire) IsHealthy() bool { - return e.store.IsHealthy() -} - -// Migrate runs the migrations. -func Migrate(db, path string) ([]error, bool) { - return migrate.UpSync(db, path) + return e.DB.IsHealthy() } // ValidationError is returned when a model is not valid. diff --git a/empiretest/test.go b/empiretest/test.go index b58f04989..b0f4d0511 100644 --- a/empiretest/test.go +++ b/empiretest/test.go @@ -27,11 +27,18 @@ var ( // NewEmpire returns a new Empire instance suitable for testing. It ensures that // the database is clean before returning. func NewEmpire(t testing.TB) *empire.Empire { - db, err := empire.NewDB(DatabaseURL) + db, err := empire.OpenDB(DatabaseURL) if err != nil { t.Fatal(err) } + // TODO: Run db.MigrateUp once migrations are in the binary. + + // Log queries if verbose mode is set. + if testing.Verbose() { + db.Debug() + } + e := empire.New(db, empire.DefaultOptions) e.Scheduler = scheduler.NewFakeScheduler() e.ExtractProcfile = ExtractProcfile diff --git a/ports.go b/ports.go index d2c7a92f6..d103c7ad4 100644 --- a/ports.go +++ b/ports.go @@ -14,52 +14,35 @@ type Port struct { var ErrNoPorts = errors.New("no ports avaiable") -func (s *store) PortsFindOrCreateByApp(app *App) (*Port, error) { - p, err := s.PortsFindByApp(app) +func portsFindOrCreateByApp(db *gorm.DB, app *App) (*Port, error) { + p, err := portsFindByApp(db, app) // If an error occurred or we found a port, return. if err != nil || p != nil { return p, err } - return s.PortsAssign(app) + return portsAssign(db, app) } -func (s *store) PortsFindByApp(app *App) (*Port, error) { - return portsFindByApp(s.db, app) -} - -func (s *store) PortsAssign(app *App) (*Port, error) { +func portsAssign(db *gorm.DB, app *App) (*Port, error) { var port *Port - t := s.db.Begin() - - port, err := portsFindAvailable(t) + port, err := portsFindAvailable(db) if err != nil { - t.Rollback() return port, err } // Assign app to port port.AppID = &app.ID - if err := portsUpdate(t, port); err != nil { - t.Rollback() - return port, err - } - - if err := t.Commit().Error; err != nil { - t.Rollback() + if err := portsUpdate(db, port); err != nil { return port, err } return port, nil } -func (s *store) PortsUnassign(app *App) error { - return portsUnassign(s.db, app) -} - func portsFindByApp(db *gorm.DB, app *App) (*Port, error) { var port Port if err := db.Where("app_id = ?", app.ID).Order("port").First(&port).Error; err != nil { diff --git a/processes.go b/processes.go index c8d1bbd89..86e64c928 100644 --- a/processes.go +++ b/processes.go @@ -75,15 +75,13 @@ func (c Command) Value() (driver.Value, error) { // Process holds configuration information about a Process Type. type Process struct { - ID string - Type ProcessType - Quantity int - Command Command - Port int `sql:"-"` - Constraints - ReleaseID string - Release *Release + ID string + Type ProcessType + Quantity int + Command Command + Port int `sql:"-"` + Constraints } // NewProcess returns a new Process instance. @@ -242,55 +240,7 @@ func (f Formation) Processes() []*Process { return processes } -// ProcessesQuery is a Scope implementation for common things to filter -// processes by. -type ProcessesQuery struct { - // If provided, finds only processes belonging to the given release. - Release *Release -} - -// Scope implements the Scope interface. -func (q ProcessesQuery) Scope(db *gorm.DB) *gorm.DB { - var scope ComposedScope - - if q.Release != nil { - scope = append(scope, FieldEquals("release_id", q.Release.ID)) - } - - return scope.Scope(db) -} - -// Processes returns all processes matching the scope. -func (s *store) Processes(scope Scope) ([]*Process, error) { - var processes []*Process - return processes, s.Find(scope, &processes) -} - -// Formation returns a Formation for the processes matching the scope. -func (s *store) Formation(scope Scope) (Formation, error) { - p, err := s.Processes(scope) - if err != nil { - return nil, err - } - return newFormation(p), nil -} - -// ProcessesCreate persists the process. -func (s *store) ProcessesCreate(process *Process) (*Process, error) { - return processesCreate(s.db, process) -} - -// ProcessesUpdate updates the process. -func (s *store) ProcessesUpdate(process *Process) error { - return processesUpdate(s.db, process) -} - -// ProcessesCreate inserts a process into the database. -func processesCreate(db *gorm.DB, process *Process) (*Process, error) { - return process, db.Create(process).Error -} - -// ProcessesUpdate updates an existing process into the database. +// processesUpdate updates an existing process into the database. func processesUpdate(db *gorm.DB, process *Process) error { return db.Save(process).Error } diff --git a/processes_test.go b/processes_test.go index 33fe669f5..ab44bcf5e 100644 --- a/processes_test.go +++ b/processes_test.go @@ -10,17 +10,6 @@ import ( "github.com/remind101/empire/pkg/constraints" ) -func TestProcessesQuery(t *testing.T) { - release := &Release{ID: "1234"} - - tests := scopeTests{ - {ProcessesQuery{}, "", []interface{}{}}, - {ProcessesQuery{Release: release}, "WHERE (release_id = $1)", []interface{}{release.ID}}, - } - - tests.Run(t) -} - func TestNewFormation(t *testing.T) { tests := []struct { f Formation diff --git a/releases.go b/releases.go index 420fb430c..829689cd4 100644 --- a/releases.go +++ b/releases.go @@ -44,6 +44,17 @@ func (r *Release) Formation() Formation { return f } +// Process return the Process with the given type. +func (r *Release) Process(t ProcessType) *Process { + for _, p := range r.Processes { + if p.Type == t { + return p + } + } + + return nil +} + // BeforeCreate sets created_at before inserting. func (r *Release) BeforeCreate() error { t := timex.Now() @@ -78,9 +89,6 @@ func (q ReleasesQuery) Scope(db *gorm.DB) *gorm.DB { scope = append(scope, Range(q.Range.WithDefaults(q.DefaultRange()))) - // Preload all the things. - scope = append(scope, Preload("App", "Config", "Slug", "Processes")) - return scope.Scope(db) } @@ -94,42 +102,106 @@ func (q ReleasesQuery) DefaultRange() headerutil.Range { } } -// ReleasesFind returns the first matching release. -func (s *store) ReleasesFind(scope Scope) (*Release, error) { +// releasesService is a service for creating and rolling back a Release. +type releasesService struct { + *Empire +} + +// Create creates a new release then submits it to the scheduler. +func (s *releasesService) Create(ctx context.Context, db *gorm.DB, r *Release) (*Release, error) { + // Lock all releases for the given application to ensure that the + // release version is updated automically. + if err := db.Exec(`select 1 from releases where app_id = ? for update`, r.App.ID).Error; err != nil { + return r, err + } + + // Create a new formation for this release. + if err := createFormation(db, r); err != nil { + return r, err + } + + r, err := releasesCreate(db, r) + if err != nil { + return r, err + } + + // Schedule the new release onto the cluster. + return r, s.Release(ctx, r) +} + +// Rolls back to a specific release version. +func (s *releasesService) Rollback(ctx context.Context, db *gorm.DB, opts RollbackOpts) (*Release, error) { + app, version := opts.App, opts.Version + r, err := releasesFind(db, ReleasesQuery{App: app, Version: &version}) + if err != nil { + return nil, err + } + + desc := fmt.Sprintf("Rollback to v%d", version) + return s.Create(ctx, db, &Release{ + App: app, + Config: r.Config, + Slug: r.Slug, + Description: desc, + }) +} + +// Release submits a release to the scheduler. +func (s *releasesService) Release(ctx context.Context, release *Release) error { + a := newServiceApp(release) + return s.Scheduler.Submit(ctx, a) +} + +// ReleaseApp will find the last release for an app and release it. +func (s *releasesService) ReleaseApp(ctx context.Context, db *gorm.DB, app *App) error { + release, err := releasesFind(db, ReleasesQuery{App: app}) + if err != nil { + if err == gorm.RecordNotFound { + return ErrNoReleases + } + + return err + } + + if release == nil { + return nil + } + + return s.Release(ctx, release) +} + +// These associations are always available on a Release. +var releasesPreload = Preload("App", "Config", "Slug", "Processes") + +// releasesFind returns the first matching release. +func releasesFind(db *gorm.DB, scope Scope) (*Release, error) { var release Release - if err := s.First(scope, &release); err != nil { + scope = ComposedScope{releasesPreload, scope} + if err := first(db, scope, &release); err != nil { return &release, err } - if err := s.attachPorts(&release); err != nil { + if err := attachPorts(db, &release); err != nil { return &release, err } return &release, nil } -// Releases returns all releases matching the scope. -func (s *store) Releases(scope Scope) ([]*Release, error) { +// releases returns all releases matching the scope. +func releases(db *gorm.DB, scope Scope) ([]*Release, error) { var releases []*Release - return releases, s.Find(scope, &releases) -} - -// ReleasesCreate persists a release. -func (s *store) ReleasesCreate(r *Release) (*Release, error) { - if err := s.attachPorts(r); err != nil { - return r, err - } - - return releasesCreate(s.db, r) + scope = ComposedScope{releasesPreload, scope} + return releases, find(db, scope, &releases) } // attachPorts returns a map of ports for a release. It will allocate new ports to an app if need be. -func (s *store) attachPorts(r *Release) error { +func attachPorts(db *gorm.DB, r *Release) error { for _, p := range r.Processes { if p.Type == WebProcessType { // TODO: Support a port per process, allowing more than one process to expose a port. - port, err := s.PortsFindOrCreateByApp(r.App) + port, err := portsFindOrCreateByApp(db, r.App) if err != nil { return err } @@ -139,32 +211,11 @@ func (s *store) attachPorts(r *Release) error { return nil } -// releasesService is a service for creating and rolling back a Release. -type releasesService struct { - *Empire -} - -// ReleasesCreate creates the release, then sets the current process formation on the release. -func (s *releasesService) ReleasesCreate(ctx context.Context, r *Release) (*Release, error) { - // Create a new formation for this release. - if err := s.createFormation(r); err != nil { - return nil, err - } - - r, err := s.store.ReleasesCreate(r) - if err != nil { - return r, err - } - - // Schedule the new release onto the cluster. - return r, s.releaser.Release(ctx, r) -} - -func (s *releasesService) createFormation(release *Release) error { +func createFormation(db *gorm.DB, release *Release) error { var existing Formation // Get the old release, so we can copy the Formation. - last, err := s.store.ReleasesFind(ReleasesQuery{App: release.App}) + last, err := releasesFind(db, ReleasesQuery{App: release.App}) if err != nil { if err != gorm.RecordNotFound { return err @@ -179,30 +230,11 @@ func (s *releasesService) createFormation(release *Release) error { return nil } -// Rolls back to a specific release version. -func (s *releasesService) Rollback(ctx context.Context, opts RollbackOpts) (*Release, error) { - app, version := opts.App, opts.Version - r, err := s.store.ReleasesFind(ReleasesQuery{App: app, Version: &version}) - if err != nil { - return nil, err - } - - desc := fmt.Sprintf("Rollback to v%d", version) - return s.ReleasesCreate(ctx, &Release{ - App: app, - Config: r.Config, - Slug: r.Slug, - Description: desc, - }) -} - -// ReleasesLastVersion returns the last ReleaseVersion for the given App. This -// function also ensures that the last release is locked until the transaction -// is commited, so the release version can be incremented atomically. +// ReleasesLastVersion returns the last ReleaseVersion for the given App. func releasesLastVersion(db *gorm.DB, appID string) (int, error) { var version int - rows, err := db.Raw(`select version from releases where app_id = ? order by version desc for update`, appID).Rows() + rows, err := db.Raw(`select version from releases where app_id = ? order by version desc`, appID).Rows() if err != nil { return version, err } @@ -218,60 +250,26 @@ func releasesLastVersion(db *gorm.DB, appID string) (int, error) { // releasesCreate creates a new Release and inserts it into the database. func releasesCreate(db *gorm.DB, release *Release) (*Release, error) { - t := db.Begin() + if err := attachPorts(db, release); err != nil { + return release, err + } // Get the last release version for this app. - v, err := releasesLastVersion(t, release.App.ID) + v, err := releasesLastVersion(db, release.App.ID) if err != nil { - t.Rollback() return release, err } // Increment the release version. release.Version = v + 1 - if err := t.Create(release).Error; err != nil { - t.Rollback() - return release, err - } - - if err := t.Commit().Error; err != nil { - t.Rollback() + if err := db.Create(release).Error; err != nil { return release, err } return release, nil } -type releaser struct { - *Empire -} - -// ScheduleRelease creates jobs for every process and instance count and -// schedules them onto the cluster. -func (r *releaser) Release(ctx context.Context, release *Release) error { - a := newServiceApp(release) - return r.Scheduler.Submit(ctx, a) -} - -// ReleaseApp will find the last release for an app and release it. -func (r *releaser) ReleaseApp(ctx context.Context, app *App) error { - release, err := r.store.ReleasesFind(ReleasesQuery{App: app}) - if err != nil { - if err == gorm.RecordNotFound { - return ErrNoReleases - } - - return err - } - - if release == nil { - return nil - } - - return r.Release(ctx, release) -} - func newServiceApp(release *Release) *scheduler.App { var processes []*scheduler.Process diff --git a/runner.go b/runner.go index 569a97d92..f624d9cbe 100644 --- a/runner.go +++ b/runner.go @@ -7,7 +7,7 @@ type runnerService struct { } func (r *runnerService) Run(ctx context.Context, opts RunOpts) error { - release, err := r.store.ReleasesFind(ReleasesQuery{App: opts.App}) + release, err := releasesFind(r.db, ReleasesQuery{App: opts.App}) if err != nil { return err } diff --git a/server/heroku/domains.go b/server/heroku/domains.go index e198fcbe3..75801efe7 100644 --- a/server/heroku/domains.go +++ b/server/heroku/domains.go @@ -4,9 +4,9 @@ import ( "fmt" "net/http" - "github.com/remind101/empire/pkg/heroku" "github.com/jinzhu/gorm" "github.com/remind101/empire" + "github.com/remind101/empire/pkg/heroku" "github.com/remind101/pkg/httpx" "golang.org/x/net/context" ) @@ -64,7 +64,7 @@ func (h *PostDomains) ServeHTTPContext(ctx context.Context, w http.ResponseWrite AppID: a.ID, Hostname: form.Hostname, } - d, err := h.DomainsCreate(domain) + d, err := h.DomainsCreate(ctx, domain) if err != nil { if err == empire.ErrDomainInUse { return fmt.Errorf("%s is currently in use by another app.", domain.Hostname) @@ -103,7 +103,7 @@ func (h *DeleteDomain) ServeHTTPContext(ctx context.Context, w http.ResponseWrit return err } - if err = h.DomainsDestroy(d); err != nil { + if err = h.DomainsDestroy(ctx, d); err != nil { return err } diff --git a/slugs.go b/slugs.go index e66948bfe..240dd9a43 100644 --- a/slugs.go +++ b/slugs.go @@ -15,12 +15,7 @@ type Slug struct { ProcessTypes CommandMap } -// SlugsCreate persists the slug. -func (s *store) SlugsCreate(slug *Slug) (*Slug, error) { - return slugsCreate(s.db, slug) -} - -// SlugsCreate inserts a Slug into the database. +// slugsCreate inserts a Slug into the database. func slugsCreate(db *gorm.DB, slug *Slug) (*Slug, error) { return slug, db.Create(slug).Error } @@ -31,20 +26,20 @@ type slugsService struct { } // SlugsCreateByImage creates a Slug for the given image. -func (s *slugsService) SlugsCreateByImage(ctx context.Context, img image.Image, out io.Writer) (*Slug, error) { - return slugsCreateByImage(ctx, s.store, s.ExtractProcfile, img, out) +func (s *slugsService) Create(ctx context.Context, db *gorm.DB, img image.Image, out io.Writer) (*Slug, error) { + return slugsCreateByImage(ctx, db, s.ExtractProcfile, img, out) } // SlugsCreateByImage first attempts to find a matching slug for the image. If // it's not found, it will fallback to extracting the process types using the // provided extractor, then create a slug. -func slugsCreateByImage(ctx context.Context, store *store, e ProcfileExtractor, img image.Image, out io.Writer) (*Slug, error) { +func slugsCreateByImage(ctx context.Context, db *gorm.DB, e ProcfileExtractor, img image.Image, out io.Writer) (*Slug, error) { slug, err := slugsExtract(ctx, e, img, out) if err != nil { return slug, err } - return store.SlugsCreate(slug) + return slugsCreate(db, slug) } // SlugsExtract extracts the process types from the image, then returns a new diff --git a/store.go b/store.go deleted file mode 100644 index 6b2651f5b..000000000 --- a/store.go +++ /dev/null @@ -1,142 +0,0 @@ -package empire - -import ( - "fmt" - - "github.com/jinzhu/gorm" - "github.com/remind101/empire/pkg/headerutil" -) - -// Scope is an interface that scopes a gorm.DB. Scopes are used in -// ThingsFirst and ThingsAll methods on the store for filtering/querying. -type Scope interface { - Scope(*gorm.DB) *gorm.DB -} - -// ScopeFunc implements the Scope interface for functions. -type ScopeFunc func(*gorm.DB) *gorm.DB - -// Scope implements the Scope interface. -func (f ScopeFunc) Scope(db *gorm.DB) *gorm.DB { - return f(db) -} - -// All returns a scope that simply returns the db. -var All = ScopeFunc(func(db *gorm.DB) *gorm.DB { - return db -}) - -// ID returns a Scope that will find the item by id. -func ID(id string) Scope { - return FieldEquals("id", id) -} - -// ForApp returns a Scope that will filter items belonging the the given app. -func ForApp(app *App) Scope { - return FieldEquals("app_id", app.ID) -} - -// ComposedScope is an implementation of the Scope interface that chains the -// scopes together. -type ComposedScope []Scope - -// Scope implements the Scope interface. -func (s ComposedScope) Scope(db *gorm.DB) *gorm.DB { - for _, s := range s { - db = s.Scope(db) - } - - return db -} - -// FieldEquals returns a Scope that filters on a field. -func FieldEquals(field string, v interface{}) Scope { - return ScopeFunc(func(db *gorm.DB) *gorm.DB { - return db.Where(fmt.Sprintf("%s = ?", field), v) - }) -} - -// Preload returns a Scope that preloads the associations. -func Preload(associations ...string) Scope { - var scope ComposedScope - - for _, a := range associations { - aa := a - scope = append(scope, ScopeFunc(func(db *gorm.DB) *gorm.DB { - return db.Preload(aa) - })) - } - - return scope -} - -// Order returns a Scope that orders the results. -func Order(order string) Scope { - return ScopeFunc(func(db *gorm.DB) *gorm.DB { - return db.Order(order) - }) -} - -// Limit returns a Scope that limits the results. -func Limit(limit int) Scope { - return ScopeFunc(func(db *gorm.DB) *gorm.DB { - return db.Limit(limit) - }) -} - -// Range returns a Scope that limits and orders the results. -func Range(r headerutil.Range) Scope { - var scope ComposedScope - - if r.Max != nil { - scope = append(scope, Limit(*r.Max)) - } - - if r.Sort != nil && r.Order != nil { - order := fmt.Sprintf("%s %s", *r.Sort, *r.Order) - scope = append(scope, Order(order)) - } - - return scope -} - -// store provides methods for CRUD'ing things. -type store struct { - db *gorm.DB -} - -// Scope applies the scope to the gorm.DB. -func (s *store) Scope(scope Scope) *gorm.DB { - return scope.Scope(s.db) -} - -// First applies the scope to the gorm.DB and finds the first record, populating -// v. -func (s *store) First(scope Scope, v interface{}) error { - return s.Scope(scope).First(v).Error -} - -// Find applies the scope to the gorm.DB and finds the matching records, -// populating v. -func (s *store) Find(scope Scope, v interface{}) error { - return s.Scope(scope).Find(v).Error -} - -func (s *store) Reset() error { - var err error - exec := func(sql string) { - if err == nil { - err = s.db.Exec(sql).Error - } - } - - exec(`TRUNCATE TABLE apps CASCADE`) - exec(`TRUNCATE TABLE ports CASCADE`) - exec(`INSERT INTO ports (port) (SELECT generate_series(9000,10000))`) - - return err -} - -func (s *store) IsHealthy() bool { - return s.db.DB().Ping() == nil -} diff --git a/tests/empire/empire_test.go b/tests/empire/empire_test.go index e9411779d..919ff1ef8 100644 --- a/tests/empire/empire_test.go +++ b/tests/empire/empire_test.go @@ -1,6 +1,8 @@ package empire_test import ( + "errors" + "io" "io/ioutil" "testing" "time" @@ -10,14 +12,13 @@ import ( "github.com/remind101/empire" "github.com/remind101/empire/empiretest" "github.com/remind101/empire/pkg/image" + "github.com/remind101/empire/procfile" "github.com/remind101/empire/scheduler" "github.com/remind101/pkg/timex" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" ) -const fakeUUID = "01234567-89ab-cdef-0123-456789abcdef" - var fakeNow = time.Date(2015, time.January, 1, 1, 1, 1, 1, time.UTC) // Stubs out time.Now in empire. @@ -118,6 +119,92 @@ func TestEmpire_Deploy(t *testing.T) { s.AssertExpectations(t) } +func TestEmpire_Deploy_ImageNotFound(t *testing.T) { + e := empiretest.NewEmpire(t) + s := new(mockScheduler) + e.Scheduler = s + e.ExtractProcfile = func(ctx context.Context, img image.Image, w io.Writer) (procfile.Procfile, error) { + return nil, errors.New("image not found") + } + + // Deploying an image to an app that doesn't exist will create a new + // app. + _, err := e.Deploy(context.Background(), empire.DeploymentsCreateOpts{ + User: &empire.User{Name: "ejholmes"}, + Output: ioutil.Discard, + Image: image.Image{Repository: "remind101/acme-inc"}, + }) + assert.Error(t, err) + + // If there's an error deploying, then the transaction should be rolled + // backed and no apps should exist. + apps, err := e.Apps(empire.AppsQuery{}) + assert.NoError(t, err) + assert.Equal(t, 0, len(apps)) + + s.AssertExpectations(t) +} + +func TestEmpire_Deploy_Concurrent(t *testing.T) { + e := empiretest.NewEmpire(t) + s := new(mockScheduler) + e.Scheduler = scheduler.NewFakeScheduler() + e.ExtractProcfile = func(ctx context.Context, img image.Image, w io.Writer) (procfile.Procfile, error) { + return nil, nil + } + + user := &empire.User{Name: "ejholmes"} + + // Create the first release for this app. + r, err := e.Deploy(context.Background(), empire.DeploymentsCreateOpts{ + User: user, + Output: ioutil.Discard, + Image: image.Image{Repository: "remind101/acme-inc"}, + }) + assert.NoError(t, err) + assert.Equal(t, 1, r.Version) + + // We'll use the procfile extractor to synchronize two concurrent + // deployments. + v2Started, v3Started := make(chan struct{}), make(chan struct{}) + e.ExtractProcfile = func(ctx context.Context, img image.Image, w io.Writer) (procfile.Procfile, error) { + switch img.Tag { + case "v2": + close(v2Started) + <-v3Started + case "v3": + close(v3Started) + } + return nil, nil + } + + v2Done := make(chan struct{}) + go func() { + r, err = e.Deploy(context.Background(), empire.DeploymentsCreateOpts{ + User: user, + Output: ioutil.Discard, + Image: image.Image{Repository: "remind101/acme-inc", Tag: "v2"}, + }) + assert.NoError(t, err) + assert.Equal(t, 2, r.Version) + close(v2Done) + }() + + <-v2Started + + r, err = e.Deploy(context.Background(), empire.DeploymentsCreateOpts{ + User: user, + Output: ioutil.Discard, + Image: image.Image{Repository: "remind101/acme-inc", Tag: "v3"}, + }) + assert.NoError(t, err) + assert.Equal(t, 3, r.Version) + + <-v2Done + + s.AssertExpectations(t) +} + func TestEmpire_Set(t *testing.T) { e := empiretest.NewEmpire(t) s := new(mockScheduler)