From fc6b7f344372a51c03ddce2c39507a2a09dabb78 Mon Sep 17 00:00:00 2001 From: lczaplinski Date: Mon, 13 Feb 2017 23:41:46 +0100 Subject: [PATCH] Initial migrations --- actions/context.go | 14 +++++++++ actions/migrate.go | 35 ++++++++++++++++++++++ config.go | 26 ++++++++++++++++ main.go | 21 +++++++++++++ persistence/event.go | 4 +++ persistence/migration_session.go | 51 ++++++++++++++++++++++++++++++++ persistence/migrations.go | 24 +++++++++++++++ persistence/provider.go | 46 ++++++++++++++++++++++++++++ persistence/session.go | 25 ++++++++++++++++ version.go | 3 ++ 10 files changed, 249 insertions(+) create mode 100644 actions/context.go create mode 100644 actions/migrate.go create mode 100644 config.go create mode 100644 main.go create mode 100644 persistence/event.go create mode 100644 persistence/migration_session.go create mode 100644 persistence/migrations.go create mode 100644 persistence/provider.go create mode 100644 persistence/session.go create mode 100644 version.go diff --git a/actions/context.go b/actions/context.go new file mode 100644 index 0000000..d6989be --- /dev/null +++ b/actions/context.go @@ -0,0 +1,14 @@ +package actions + +import ( + "github.com/scoiatael/archai/persistence" +) + +type Context interface { + Persistence() persistence.Provider + Migrations() map[string]persistence.Migration +} + +type Action interface { + Run(Context) error +} diff --git a/actions/migrate.go b/actions/migrate.go new file mode 100644 index 0000000..f013956 --- /dev/null +++ b/actions/migrate.go @@ -0,0 +1,35 @@ +package actions + +import ( + "log" +) + +// Migrate implements Action interface +type Migrate struct{} + +// Run all migrations +func (a Migrate) Run(c Context) error { + persistenceProvider := c.Persistence() + migrationSession, err := persistenceProvider.MigrationSession() + if err != nil { + return err + } + for name, m := range c.Migrations() { + shouldRun, err := migrationSession.ShouldRunMigration(name) + if err != nil { + return err + } + if shouldRun { + log.Println("Executing ", name) + err = m.Run(migrationSession) + if err != nil { + return err + } + err = migrationSession.DidRunMigration(name) + if err != nil { + return err + } + } + } + return nil +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..a52bc99 --- /dev/null +++ b/config.go @@ -0,0 +1,26 @@ +package main + +import ( + "github.com/scoiatael/archai/actions" + "github.com/scoiatael/archai/persistence" +) + +type Config struct { + keyspace string + hosts []string + actions []actions.Action +} + +func (c Config) Migrations() map[string]persistence.Migration { + m := make(map[string]persistence.Migration) + m["create_events_table"] = persistence.CreateEventsTable + return m +} + +func (c Config) Persistence() persistence.Provider { + hosts := make([]string, 1) + hosts[0] = "127.0.0.1" + provider := persistence.CassandraProvider{Hosts: hosts, + Keyspace: c.keyspace} + return &provider +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..e11ff99 --- /dev/null +++ b/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "os" + + "github.com/scoiatael/archai/actions" + "github.com/urfave/cli" +) + +func main() { + app := cli.NewApp() + app.Name = "archai" + app.Usage = "eventstore replacement" + app.Version = Version + app.Action = func(c *cli.Context) error { + config := Config{keyspace: "archai_test"} + return actions.Migrate{}.Run(config) + } + + app.Run(os.Args) +} diff --git a/persistence/event.go b/persistence/event.go new file mode 100644 index 0000000..0fe5d49 --- /dev/null +++ b/persistence/event.go @@ -0,0 +1,4 @@ +package persistence + +type Event struct { +} diff --git a/persistence/migration_session.go b/persistence/migration_session.go new file mode 100644 index 0000000..00f6899 --- /dev/null +++ b/persistence/migration_session.go @@ -0,0 +1,51 @@ +package persistence + +import ( + "fmt" + + "log" + + "github.com/gocql/gocql" +) + +type MigrationSession interface { + ShouldRunMigration(string) (bool, error) + DidRunMigration(string) error + Query(string) error +} + +type CassandraMigrationSession struct { + session *gocql.Session +} + +func (sess *CassandraMigrationSession) Query(query string) error { + return sess.session.Query(query).Exec() +} + +const migrationTable = "archai_migrations" + +var createMigrationTable = fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ( + name VARCHAR, + PRIMARY KEY (name) + ) +`, migrationTable) + +var findMigration = fmt.Sprintf(`SELECT name FROM %s WHERE name = ? LIMIT 1`, migrationTable) + +var insertMigration = fmt.Sprintf(`INSERT INTO %s (name) VALUES (?)`, migrationTable) + +func (sess *CassandraMigrationSession) ShouldRunMigration(name string) (bool, error) { + if err := sess.session.Query(createMigrationTable).Exec(); err != nil { + return false, err + } + log.Println("Looking for migration ", name) + iter := sess.session.Query(findMigration, name).Iter() + found := iter.Scan(nil) + err := iter.Close() + return !found, err +} + +func (sess *CassandraMigrationSession) DidRunMigration(name string) error { + return sess.session.Query(insertMigration, name).Exec() +} diff --git a/persistence/migrations.go b/persistence/migrations.go new file mode 100644 index 0000000..9e93e5f --- /dev/null +++ b/persistence/migrations.go @@ -0,0 +1,24 @@ +package persistence + +type Migration interface { + Run(MigrationSession) error +} + +type SimpleMigration struct { + Query string +} + +func (simpleMigration SimpleMigration) Run(session MigrationSession) error { + err := session.Query(simpleMigration.Query) + return err +} + +var CreateEventsTable = SimpleMigration{Query: ` + CREATE TABLE IF NOT EXISTS events ( + id TIMEUUID, + blob BLOB, + stream VARCHAR, + meta MAP, + PRIMARY KEY (stream, id) + ) +`} diff --git a/persistence/provider.go b/persistence/provider.go new file mode 100644 index 0000000..8f03fa7 --- /dev/null +++ b/persistence/provider.go @@ -0,0 +1,46 @@ +package persistence + +import ( + "fmt" + + "github.com/gocql/gocql" +) + +type Provider interface { + Session() (Session, error) + MigrationSession() (MigrationSession, error) +} + +type CassandraProvider struct { + Hosts []string + Keyspace string +} + +func (cp *CassandraProvider) newCluster() *gocql.ClusterConfig { + return gocql.NewCluster(cp.Hosts...) +} + +func (cp *CassandraProvider) Session() (Session, error) { + cluster := cp.newCluster() + cluster.Keyspace = cp.Keyspace + cluster.Consistency = gocql.Quorum + sess, err := cluster.CreateSession() + return &CassandraSession{session: sess}, err +} + +func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { + cluster := cp.newCluster() + cluster.Consistency = gocql.All + sess, err := cluster.CreateSession() + if err != nil { + return &CassandraMigrationSession{}, err + } + err = sess.Query(fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cp.Keyspace)).Exec() + if err != nil { + return &CassandraMigrationSession{}, err + } + cluster.Keyspace = cp.Keyspace + sess, err = cluster.CreateSession() + + return &CassandraMigrationSession{session: sess}, err +} diff --git a/persistence/session.go b/persistence/session.go new file mode 100644 index 0000000..faa61fb --- /dev/null +++ b/persistence/session.go @@ -0,0 +1,25 @@ +package persistence + +import ( + "github.com/gocql/gocql" +) + +type Session interface { + WriteEvent(string, []byte, map[string]string) error + ReadEvents(string, string, int) []Event +} + +type CassandraSession struct { + session *gocql.Session +} + +const insertEvent = `INSERT INTO events (id, stream, blob, meta) VALUES (now(), ?, ?, ?)` + +func (sess *CassandraSession) WriteEvent(stream string, blob []byte, meta map[string]string) error { + return sess.session.Query(insertEvent, stream, blob, meta).Exec() +} + +func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) []Event { + events := make([]Event, 0) + return events +} diff --git a/version.go b/version.go new file mode 100644 index 0000000..0354e88 --- /dev/null +++ b/version.go @@ -0,0 +1,3 @@ +package main + +const Version = "0.1.0"