diff --git a/actions/context.go b/actions/context.go index d6989be..e717caa 100644 --- a/actions/context.go +++ b/actions/context.go @@ -7,6 +7,7 @@ import ( type Context interface { Persistence() persistence.Provider Migrations() map[string]persistence.Migration + Version() string } type Action interface { diff --git a/actions/migrate.go b/actions/migrate.go index f013956..0c998d2 100644 --- a/actions/migrate.go +++ b/actions/migrate.go @@ -1,7 +1,10 @@ package actions import ( + "fmt" "log" + + "github.com/pkg/errors" ) // Migrate implements Action interface @@ -12,22 +15,22 @@ func (a Migrate) Run(c Context) error { persistenceProvider := c.Persistence() migrationSession, err := persistenceProvider.MigrationSession() if err != nil { - return err + return errors.Wrap(err, "Obtaining session failed") } for name, m := range c.Migrations() { shouldRun, err := migrationSession.ShouldRunMigration(name) if err != nil { - return err + return errors.Wrap(err, fmt.Sprintf("Checking if should run migration %s failed", name)) } if shouldRun { log.Println("Executing ", name) err = m.Run(migrationSession) if err != nil { - return err + return errors.Wrap(err, fmt.Sprintf("Running migration %s failed", name)) } err = migrationSession.DidRunMigration(name) if err != nil { - return err + return errors.Wrap(err, fmt.Sprintf("Running after-migration hook for %s failed", name)) } } } diff --git a/actions/read_event.go b/actions/read_event.go new file mode 100644 index 0000000..487f25f --- /dev/null +++ b/actions/read_event.go @@ -0,0 +1,32 @@ +package actions + +import ( + "fmt" + + "github.com/pkg/errors" +) + +type ReadEvent struct { + Stream string + Cursor string + Amount int +} + +const minTimeuuid = "00000000-0000-1000-8080-808080808080" + +func (re ReadEvent) Run(c Context) error { + persistenceProvider := c.Persistence() + session, err := persistenceProvider.Session() + if err != nil { + return errors.Wrap(err, "Obtaining session failed") + } + if len(re.Cursor) == 0 { + re.Cursor = minTimeuuid + } + events, err := session.ReadEvents(re.Stream, re.Cursor, re.Amount) + for _, ev := range events { + js := string(ev.Blob) + fmt.Printf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js) + } + return errors.Wrap(err, fmt.Sprintf("Error reading event from stream %s", re.Stream)) +} diff --git a/actions/write_event.go b/actions/write_event.go new file mode 100644 index 0000000..7630a8c --- /dev/null +++ b/actions/write_event.go @@ -0,0 +1,23 @@ +package actions + +import ( + "fmt" + + "github.com/pkg/errors" +) + +type WriteEvent struct { + Stream string + Payload []byte + Meta map[string]string +} + +func (we WriteEvent) Run(c Context) error { + persistenceProvider := c.Persistence() + session, err := persistenceProvider.Session() + if err != nil { + return errors.Wrap(err, "Obtaining session failed") + } + we.Meta["version"] = c.Version() + return errors.Wrap(session.WriteEvent(we.Stream, we.Payload, we.Meta), fmt.Sprintf("Error writing event to stream %s", we.Stream)) +} diff --git a/actions/write_event_from_stream.go b/actions/write_event_from_stream.go new file mode 100644 index 0000000..b23838e --- /dev/null +++ b/actions/write_event_from_stream.go @@ -0,0 +1,52 @@ +package actions + +import ( + "encoding/json" + "io" + + "github.com/pkg/errors" +) + +type WriteEventFromStream struct { + Stream string + Input io.Reader +} + +// Read up to Mb from stream +const MAX_READ = 1024 * 1024 + +func readJSONFromStream(input io.Reader) ([]byte, error) { + inputBuf := make([]byte, MAX_READ) + _, err := input.Read(inputBuf) + if err != nil { + return inputBuf, errors.Wrap(err, "Input read failed") + } + for i, v := range inputBuf { + if v == '\x00' { + inputBuf = inputBuf[:i] + break + } + } + var js map[string]interface{} + err = json.Unmarshal(inputBuf, &js) + if err != nil { + return inputBuf, errors.Wrap(err, "Input is not JSON") + } + out, err := json.Marshal(js) + if err != nil { + return inputBuf, errors.Wrap(err, "Marshalling as JSON failed") + } + return out, nil +} + +func (wes WriteEventFromStream) Run(c Context) error { + we := WriteEvent{Stream: wes.Stream, Meta: make(map[string]string)} + we.Meta["origin"] = "stream" + we.Meta["compressed"] = "false" + var err error + we.Payload, err = readJSONFromStream(wes.Input) + if err != nil { + return errors.Wrap(err, "Failed reading input") + } + return errors.Wrap(we.Run(c), "Failed running WriteEvent action") +} diff --git a/config.go b/config.go index a52bc99..415d74c 100644 --- a/config.go +++ b/config.go @@ -24,3 +24,7 @@ func (c Config) Persistence() persistence.Provider { Keyspace: c.keyspace} return &provider } + +func (c Config) Version() string { + return Version +} diff --git a/main.go b/main.go index e11ff99..b6b29da 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,9 @@ func main() { app.Version = Version app.Action = func(c *cli.Context) error { config := Config{keyspace: "archai_test"} - return actions.Migrate{}.Run(config) + action := actions.ReadEvent{Stream: "testing-stream", Amount: 10} + err := action.Run(config) + return err } app.Run(os.Args) diff --git a/persistence/event.go b/persistence/event.go index 0fe5d49..0e2d646 100644 --- a/persistence/event.go +++ b/persistence/event.go @@ -1,4 +1,33 @@ package persistence +import ( + "fmt" + + "github.com/gocql/gocql" +) + type Event struct { + ID string + Stream string + Blob []byte + Meta map[string]string +} + +func eventFromRow(row map[string]interface{}) (Event, error) { + var conv bool + ev := Event{} + id, conv := row["id"].(gocql.UUID) + if !conv { + return ev, fmt.Errorf("Failed converting %v to UUID", row["id"]) + } + ev.ID = id.String() + ev.Blob, conv = row["blob"].([]byte) + if !conv { + return ev, fmt.Errorf("Failed converting %v to blob", row["blob"]) + } + ev.Meta, conv = row["meta"].(map[string]string) + if !conv { + return ev, fmt.Errorf("Failed converting %v to map", row["map"]) + } + return ev, nil } diff --git a/persistence/migration_session.go b/persistence/migration_session.go index 00f6899..bdffa04 100644 --- a/persistence/migration_session.go +++ b/persistence/migration_session.go @@ -6,6 +6,7 @@ import ( "log" "github.com/gocql/gocql" + "github.com/pkg/errors" ) type MigrationSession interface { @@ -37,13 +38,13 @@ var insertMigration = fmt.Sprintf(`INSERT INTO %s (name) VALUES (?)`, migrationT func (sess *CassandraMigrationSession) ShouldRunMigration(name string) (bool, error) { if err := sess.session.Query(createMigrationTable).Exec(); err != nil { - return false, err + return false, errors.Wrap(err, "Query to createMigrationTable failed") } log.Println("Looking for migration ", name) iter := sess.session.Query(findMigration, name).Iter() found := iter.Scan(nil) err := iter.Close() - return !found, err + return !found, errors.Wrap(err, "Closing iterator for findMigration failed") } func (sess *CassandraMigrationSession) DidRunMigration(name string) error { diff --git a/persistence/migrations.go b/persistence/migrations.go index 9e93e5f..80873f2 100644 --- a/persistence/migrations.go +++ b/persistence/migrations.go @@ -1,5 +1,7 @@ package persistence +import "github.com/pkg/errors" + type Migration interface { Run(MigrationSession) error } @@ -10,7 +12,7 @@ type SimpleMigration struct { func (simpleMigration SimpleMigration) Run(session MigrationSession) error { err := session.Query(simpleMigration.Query) - return err + return errors.Wrap(err, "Query to execute SimpleMigration failed") } var CreateEventsTable = SimpleMigration{Query: ` diff --git a/persistence/provider.go b/persistence/provider.go index 8f03fa7..e92be50 100644 --- a/persistence/provider.go +++ b/persistence/provider.go @@ -4,6 +4,7 @@ import ( "fmt" "github.com/gocql/gocql" + "github.com/pkg/errors" ) type Provider interface { @@ -25,7 +26,7 @@ func (cp *CassandraProvider) Session() (Session, error) { cluster.Keyspace = cp.Keyspace cluster.Consistency = gocql.Quorum sess, err := cluster.CreateSession() - return &CassandraSession{session: sess}, err + return &CassandraSession{session: sess}, errors.Wrap(err, "CreateSession failed") } func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { @@ -33,14 +34,14 @@ func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { cluster.Consistency = gocql.All sess, err := cluster.CreateSession() if err != nil { - return &CassandraMigrationSession{}, err + return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed") } err = sess.Query(fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cp.Keyspace)).Exec() if err != nil { - return &CassandraMigrationSession{}, err + return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed") } cluster.Keyspace = cp.Keyspace sess, err = cluster.CreateSession() - return &CassandraMigrationSession{session: sess}, err + return &CassandraMigrationSession{session: sess}, errors.Wrap(err, "CreateSession failed") } diff --git a/persistence/session.go b/persistence/session.go index faa61fb..fc6b9ab 100644 --- a/persistence/session.go +++ b/persistence/session.go @@ -2,11 +2,12 @@ package persistence import ( "github.com/gocql/gocql" + "github.com/pkg/errors" ) type Session interface { WriteEvent(string, []byte, map[string]string) error - ReadEvents(string, string, int) []Event + ReadEvents(string, string, int) ([]Event, error) } type CassandraSession struct { @@ -16,10 +17,22 @@ type CassandraSession struct { const insertEvent = `INSERT INTO events (id, stream, blob, meta) VALUES (now(), ?, ?, ?)` func (sess *CassandraSession) WriteEvent(stream string, blob []byte, meta map[string]string) error { - return sess.session.Query(insertEvent, stream, blob, meta).Exec() + return errors.Wrap(sess.session.Query(insertEvent, stream, blob, meta).Exec(), "Error writing event to Cassandra") } -func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) []Event { - events := make([]Event, 0) - return events +const readEvent = `SELECT id, blob, meta FROM events WHERE stream = ? AND id > ? LIMIT ?` + +func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) ([]Event, error) { + iter := sess.session.Query(readEvent, stream, cursor, amount).Iter() + rows, err := iter.SliceMap() + events := make([]Event, len(rows)) + for i, r := range rows { + events[i], err = eventFromRow(r) + if err != nil { + return events, errors.Wrap(err, "Conversion to Event failed") + } + events[i].Stream = stream + } + err = iter.Close() + return events, errors.Wrap(err, "Failed readEvent") }