Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Extract types to separate package
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Feb 18, 2017
1 parent 88313cd commit dcba080
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
// Place your settings in this file to overwrite default and user settings.
{
}
1 change: 1 addition & 0 deletions actions/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (a Migrate) Run(c Context) error {
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
defer migrationSession.Close()
for name, m := range c.Migrations() {
shouldRun, err := migrationSession.ShouldRunMigration(name)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions actions/read_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@ import (
"fmt"

"github.com/pkg/errors"
"github.com/scoiatael/archai/persistence"
"github.com/scoiatael/archai/types"
)

type ReadEvents struct {
Stream string
Cursor string
Amount int

Output chan (persistence.Event)
Output chan (types.Event)
}

const minTimeuuid = "00000000-0000-1000-8080-808080808080"
Expand All @@ -23,6 +23,7 @@ func (re ReadEvents) Run(c Context) error {
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
defer session.Close()
if len(re.Cursor) == 0 {
re.Cursor = minTimeuuid
}
Expand Down
6 changes: 3 additions & 3 deletions actions/read_events_to_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"io"

"github.com/scoiatael/archai/persistence"
"github.com/scoiatael/archai/types"
)

type ReadEventsToStream struct {
Expand All @@ -15,15 +15,15 @@ type ReadEventsToStream struct {
Output io.Writer
}

func printEventToStream(out io.Writer, ev persistence.Event) error {
func printEventToStream(out io.Writer, ev types.Event) error {
js := string(ev.Blob)
str := fmt.Sprintf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js)
_, err := out.Write([]byte(str))
return err
}

func (res ReadEventsToStream) Run(c Context) error {
ch := make(chan (persistence.Event), 100)
ch := make(chan (types.Event), 100)
err := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10, Output: ch}.Run(c)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions actions/write_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func (we WriteEvent) Run(c Context) error {
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
defer session.Close()
we.Meta["version"] = c.Version()
return errors.Wrap(session.WriteEvent(we.Stream, we.Payload, we.Meta), fmt.Sprintf("Error writing event to stream %s", we.Stream))
}
33 changes: 0 additions & 33 deletions persistence/event.go

This file was deleted.

15 changes: 8 additions & 7 deletions persistence/migration_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ import (
type MigrationSession interface {
ShouldRunMigration(string) (bool, error)
DidRunMigration(string) error
Query(string) error
Exec(string) error
Close()
}

type CassandraMigrationSession struct {
session *gocql.Session
*gocql.Session
}

func (sess *CassandraMigrationSession) Query(query string) error {
return sess.session.Query(query).Exec()
func (sess *CassandraMigrationSession) Exec(query string) error {
return sess.Query(query).Exec()
}

const migrationTable = "archai_migrations"
Expand All @@ -37,16 +38,16 @@ var findMigration = fmt.Sprintf(`SELECT name FROM %s WHERE name = ? LIMIT 1`, mi
var insertMigration = fmt.Sprintf(`INSERT INTO %s (name) VALUES (?)`, migrationTable)

func (sess *CassandraMigrationSession) ShouldRunMigration(name string) (bool, error) {
if err := sess.session.Query(createMigrationTable).Exec(); err != nil {
if err := sess.Query(createMigrationTable).Exec(); err != nil {
return false, errors.Wrap(err, "Query to createMigrationTable failed")
}
log.Println("Looking for migration ", name)
iter := sess.session.Query(findMigration, name).Iter()
iter := sess.Query(findMigration, name).Iter()
found := iter.Scan(nil)
err := iter.Close()
return !found, errors.Wrap(err, "Closing iterator for findMigration failed")
}

func (sess *CassandraMigrationSession) DidRunMigration(name string) error {
return sess.session.Query(insertMigration, name).Exec()
return sess.Query(insertMigration, name).Exec()
}
2 changes: 1 addition & 1 deletion persistence/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type SimpleMigration struct {
}

func (simpleMigration SimpleMigration) Run(session MigrationSession) error {
err := session.Query(simpleMigration.Query)
err := session.Exec(simpleMigration.Query)
return errors.Wrap(err, "Query to execute SimpleMigration failed")
}

Expand Down
8 changes: 5 additions & 3 deletions persistence/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,24 @@ func (cp *CassandraProvider) Session() (Session, error) {
cluster.Keyspace = cp.Keyspace
cluster.Consistency = gocql.Quorum
sess, err := cluster.CreateSession()
return &CassandraSession{session: sess}, errors.Wrap(err, "CreateSession failed")
return &CassandraSession{sess}, errors.Wrap(err, "CreateSession failed")
}

const createKeySpace = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`

func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) {
cluster := cp.newCluster()
cluster.Consistency = gocql.All
sess, err := cluster.CreateSession()
if err != nil {
return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed")
}
err = sess.Query(fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cp.Keyspace)).Exec()
err = sess.Query(fmt.Sprintf(createKeySpace, cp.Keyspace)).Exec()
if err != nil {
return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed")
}
cluster.Keyspace = cp.Keyspace
sess, err = cluster.CreateSession()

return &CassandraMigrationSession{session: sess}, errors.Wrap(err, "CreateSession failed")
return &CassandraMigrationSession{sess}, errors.Wrap(err, "CreateSession failed")
}
35 changes: 29 additions & 6 deletions persistence/session.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,54 @@
package persistence

import (
"fmt"

"github.com/gocql/gocql"
"github.com/pkg/errors"
"github.com/scoiatael/archai/types"
)

type Session interface {
WriteEvent(string, []byte, map[string]string) error
ReadEvents(string, string, int) ([]Event, error)
ReadEvents(string, string, int) ([]types.Event, error)
Close()
}

type CassandraSession struct {
session *gocql.Session
*gocql.Session
}

const insertEvent = `INSERT INTO events (id, stream, blob, meta) VALUES (now(), ?, ?, ?)`

func (sess *CassandraSession) WriteEvent(stream string, blob []byte, meta map[string]string) error {
return errors.Wrap(sess.session.Query(insertEvent, stream, blob, meta).Exec(), "Error writing event to Cassandra")
return errors.Wrap(sess.Query(insertEvent, stream, blob, meta).Exec(), "Error writing event to Cassandra")
}

func eventFromRow(row map[string]interface{}) (types.Event, error) {
var conv bool
ev := types.Event{}
id, conv := row["id"].(gocql.UUID)
if !conv {
return ev, fmt.Errorf("Failed converting %v to UUID", row["id"])
}
ev.ID = id.String()
ev.Blob, conv = row["blob"].([]byte)
if !conv {
return ev, fmt.Errorf("Failed converting %v to blob", row["blob"])
}
ev.Meta, conv = row["meta"].(map[string]string)
if !conv {
return ev, fmt.Errorf("Failed converting %v to map", row["map"])
}
return ev, nil
}

const readEvent = `SELECT id, blob, meta FROM events WHERE stream = ? AND id > ? LIMIT ?`

func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) ([]Event, error) {
iter := sess.session.Query(readEvent, stream, cursor, amount).Iter()
func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) ([]types.Event, error) {
iter := sess.Query(readEvent, stream, cursor, amount).Iter()
rows, err := iter.SliceMap()
events := make([]Event, len(rows))
events := make([]types.Event, len(rows))
for i, r := range rows {
events[i], err = eventFromRow(r)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions types/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package types

type Event struct {
ID string
Stream string
Blob []byte
Meta map[string]string
}

0 comments on commit dcba080

Please sign in to comment.