diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..20af2f6 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +// Place your settings in this file to overwrite default and user settings. +{ +} \ No newline at end of file diff --git a/actions/migrate.go b/actions/migrate.go index 0c998d2..60b74f2 100644 --- a/actions/migrate.go +++ b/actions/migrate.go @@ -17,6 +17,7 @@ func (a Migrate) Run(c Context) error { if err != nil { return errors.Wrap(err, "Obtaining session failed") } + defer migrationSession.Close() for name, m := range c.Migrations() { shouldRun, err := migrationSession.ShouldRunMigration(name) if err != nil { diff --git a/actions/read_events.go b/actions/read_events.go index 7a3c284..9fc54c0 100644 --- a/actions/read_events.go +++ b/actions/read_events.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/pkg/errors" - "github.com/scoiatael/archai/persistence" + "github.com/scoiatael/archai/types" ) type ReadEvents struct { @@ -12,7 +12,7 @@ type ReadEvents struct { Cursor string Amount int - Output chan (persistence.Event) + Output chan (types.Event) } const minTimeuuid = "00000000-0000-1000-8080-808080808080" @@ -23,6 +23,7 @@ func (re ReadEvents) Run(c Context) error { if err != nil { return errors.Wrap(err, "Obtaining session failed") } + defer session.Close() if len(re.Cursor) == 0 { re.Cursor = minTimeuuid } diff --git a/actions/read_events_to_stream.go b/actions/read_events_to_stream.go index 8ef75ca..7c193f7 100644 --- a/actions/read_events_to_stream.go +++ b/actions/read_events_to_stream.go @@ -4,7 +4,7 @@ import ( "fmt" "io" - "github.com/scoiatael/archai/persistence" + "github.com/scoiatael/archai/types" ) type ReadEventsToStream struct { @@ -15,7 +15,7 @@ type ReadEventsToStream struct { Output io.Writer } -func printEventToStream(out io.Writer, ev persistence.Event) error { +func printEventToStream(out io.Writer, ev types.Event) error { js := string(ev.Blob) str := fmt.Sprintf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js) _, err := out.Write([]byte(str)) @@ -23,7 +23,7 @@ func printEventToStream(out io.Writer, ev persistence.Event) error { } func (res ReadEventsToStream) Run(c Context) error { - ch := make(chan (persistence.Event), 100) + ch := make(chan (types.Event), 100) err := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10, Output: ch}.Run(c) if err != nil { return err diff --git a/actions/write_event.go b/actions/write_event.go index 7630a8c..26a20a3 100644 --- a/actions/write_event.go +++ b/actions/write_event.go @@ -18,6 +18,7 @@ func (we WriteEvent) Run(c Context) error { if err != nil { return errors.Wrap(err, "Obtaining session failed") } + defer session.Close() we.Meta["version"] = c.Version() return errors.Wrap(session.WriteEvent(we.Stream, we.Payload, we.Meta), fmt.Sprintf("Error writing event to stream %s", we.Stream)) } diff --git a/persistence/event.go b/persistence/event.go deleted file mode 100644 index 0e2d646..0000000 --- a/persistence/event.go +++ /dev/null @@ -1,33 +0,0 @@ -package persistence - -import ( - "fmt" - - "github.com/gocql/gocql" -) - -type Event struct { - ID string - Stream string - Blob []byte - Meta map[string]string -} - -func eventFromRow(row map[string]interface{}) (Event, error) { - var conv bool - ev := Event{} - id, conv := row["id"].(gocql.UUID) - if !conv { - return ev, fmt.Errorf("Failed converting %v to UUID", row["id"]) - } - ev.ID = id.String() - ev.Blob, conv = row["blob"].([]byte) - if !conv { - return ev, fmt.Errorf("Failed converting %v to blob", row["blob"]) - } - ev.Meta, conv = row["meta"].(map[string]string) - if !conv { - return ev, fmt.Errorf("Failed converting %v to map", row["map"]) - } - return ev, nil -} diff --git a/persistence/migration_session.go b/persistence/migration_session.go index bdffa04..8dd1515 100644 --- a/persistence/migration_session.go +++ b/persistence/migration_session.go @@ -12,15 +12,16 @@ import ( type MigrationSession interface { ShouldRunMigration(string) (bool, error) DidRunMigration(string) error - Query(string) error + Exec(string) error + Close() } type CassandraMigrationSession struct { - session *gocql.Session + *gocql.Session } -func (sess *CassandraMigrationSession) Query(query string) error { - return sess.session.Query(query).Exec() +func (sess *CassandraMigrationSession) Exec(query string) error { + return sess.Query(query).Exec() } const migrationTable = "archai_migrations" @@ -37,16 +38,16 @@ var findMigration = fmt.Sprintf(`SELECT name FROM %s WHERE name = ? LIMIT 1`, mi var insertMigration = fmt.Sprintf(`INSERT INTO %s (name) VALUES (?)`, migrationTable) func (sess *CassandraMigrationSession) ShouldRunMigration(name string) (bool, error) { - if err := sess.session.Query(createMigrationTable).Exec(); err != nil { + if err := sess.Query(createMigrationTable).Exec(); err != nil { return false, errors.Wrap(err, "Query to createMigrationTable failed") } log.Println("Looking for migration ", name) - iter := sess.session.Query(findMigration, name).Iter() + iter := sess.Query(findMigration, name).Iter() found := iter.Scan(nil) err := iter.Close() return !found, errors.Wrap(err, "Closing iterator for findMigration failed") } func (sess *CassandraMigrationSession) DidRunMigration(name string) error { - return sess.session.Query(insertMigration, name).Exec() + return sess.Query(insertMigration, name).Exec() } diff --git a/persistence/migrations.go b/persistence/migrations.go index 80873f2..23ece62 100644 --- a/persistence/migrations.go +++ b/persistence/migrations.go @@ -11,7 +11,7 @@ type SimpleMigration struct { } func (simpleMigration SimpleMigration) Run(session MigrationSession) error { - err := session.Query(simpleMigration.Query) + err := session.Exec(simpleMigration.Query) return errors.Wrap(err, "Query to execute SimpleMigration failed") } diff --git a/persistence/provider.go b/persistence/provider.go index e92be50..7c1ea0c 100644 --- a/persistence/provider.go +++ b/persistence/provider.go @@ -26,9 +26,11 @@ func (cp *CassandraProvider) Session() (Session, error) { cluster.Keyspace = cp.Keyspace cluster.Consistency = gocql.Quorum sess, err := cluster.CreateSession() - return &CassandraSession{session: sess}, errors.Wrap(err, "CreateSession failed") + return &CassandraSession{sess}, errors.Wrap(err, "CreateSession failed") } +const createKeySpace = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };` + func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { cluster := cp.newCluster() cluster.Consistency = gocql.All @@ -36,12 +38,12 @@ func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { if err != nil { return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed") } - err = sess.Query(fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cp.Keyspace)).Exec() + err = sess.Query(fmt.Sprintf(createKeySpace, cp.Keyspace)).Exec() if err != nil { return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed") } cluster.Keyspace = cp.Keyspace sess, err = cluster.CreateSession() - return &CassandraMigrationSession{session: sess}, errors.Wrap(err, "CreateSession failed") + return &CassandraMigrationSession{sess}, errors.Wrap(err, "CreateSession failed") } diff --git a/persistence/session.go b/persistence/session.go index fc6b9ab..1b89393 100644 --- a/persistence/session.go +++ b/persistence/session.go @@ -1,31 +1,54 @@ package persistence import ( + "fmt" + "github.com/gocql/gocql" "github.com/pkg/errors" + "github.com/scoiatael/archai/types" ) type Session interface { WriteEvent(string, []byte, map[string]string) error - ReadEvents(string, string, int) ([]Event, error) + ReadEvents(string, string, int) ([]types.Event, error) + Close() } type CassandraSession struct { - session *gocql.Session + *gocql.Session } const insertEvent = `INSERT INTO events (id, stream, blob, meta) VALUES (now(), ?, ?, ?)` func (sess *CassandraSession) WriteEvent(stream string, blob []byte, meta map[string]string) error { - return errors.Wrap(sess.session.Query(insertEvent, stream, blob, meta).Exec(), "Error writing event to Cassandra") + return errors.Wrap(sess.Query(insertEvent, stream, blob, meta).Exec(), "Error writing event to Cassandra") +} + +func eventFromRow(row map[string]interface{}) (types.Event, error) { + var conv bool + ev := types.Event{} + id, conv := row["id"].(gocql.UUID) + if !conv { + return ev, fmt.Errorf("Failed converting %v to UUID", row["id"]) + } + ev.ID = id.String() + ev.Blob, conv = row["blob"].([]byte) + if !conv { + return ev, fmt.Errorf("Failed converting %v to blob", row["blob"]) + } + ev.Meta, conv = row["meta"].(map[string]string) + if !conv { + return ev, fmt.Errorf("Failed converting %v to map", row["map"]) + } + return ev, nil } const readEvent = `SELECT id, blob, meta FROM events WHERE stream = ? AND id > ? LIMIT ?` -func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) ([]Event, error) { - iter := sess.session.Query(readEvent, stream, cursor, amount).Iter() +func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) ([]types.Event, error) { + iter := sess.Query(readEvent, stream, cursor, amount).Iter() rows, err := iter.SliceMap() - events := make([]Event, len(rows)) + events := make([]types.Event, len(rows)) for i, r := range rows { events[i], err = eventFromRow(r) if err != nil { diff --git a/types/event.go b/types/event.go new file mode 100644 index 0000000..c2e323e --- /dev/null +++ b/types/event.go @@ -0,0 +1,8 @@ +package types + +type Event struct { + ID string + Stream string + Blob []byte + Meta map[string]string +}