Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Write, Read CLI functions
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Feb 14, 2017
1 parent fc6b7f3 commit f078a46
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 17 deletions.
1 change: 1 addition & 0 deletions actions/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
type Context interface {
Persistence() persistence.Provider
Migrations() map[string]persistence.Migration
Version() string
}

type Action interface {
Expand Down
11 changes: 7 additions & 4 deletions actions/migrate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package actions

import (
"fmt"
"log"

"github.com/pkg/errors"
)

// Migrate implements Action interface
Expand All @@ -12,22 +15,22 @@ func (a Migrate) Run(c Context) error {
persistenceProvider := c.Persistence()
migrationSession, err := persistenceProvider.MigrationSession()
if err != nil {
return err
return errors.Wrap(err, "Obtaining session failed")
}
for name, m := range c.Migrations() {
shouldRun, err := migrationSession.ShouldRunMigration(name)
if err != nil {
return err
return errors.Wrap(err, fmt.Sprintf("Checking if should run migration %s failed", name))
}
if shouldRun {
log.Println("Executing ", name)
err = m.Run(migrationSession)
if err != nil {
return err
return errors.Wrap(err, fmt.Sprintf("Running migration %s failed", name))
}
err = migrationSession.DidRunMigration(name)
if err != nil {
return err
return errors.Wrap(err, fmt.Sprintf("Running after-migration hook for %s failed", name))
}
}
}
Expand Down
32 changes: 32 additions & 0 deletions actions/read_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package actions

import (
"fmt"

"github.com/pkg/errors"
)

type ReadEvent struct {
Stream string
Cursor string
Amount int
}

const minTimeuuid = "00000000-0000-1000-8080-808080808080"

func (re ReadEvent) Run(c Context) error {
persistenceProvider := c.Persistence()
session, err := persistenceProvider.Session()
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
if len(re.Cursor) == 0 {
re.Cursor = minTimeuuid
}
events, err := session.ReadEvents(re.Stream, re.Cursor, re.Amount)
for _, ev := range events {
js := string(ev.Blob)
fmt.Printf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js)
}
return errors.Wrap(err, fmt.Sprintf("Error reading event from stream %s", re.Stream))
}
23 changes: 23 additions & 0 deletions actions/write_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package actions

import (
"fmt"

"github.com/pkg/errors"
)

type WriteEvent struct {
Stream string
Payload []byte
Meta map[string]string
}

func (we WriteEvent) Run(c Context) error {
persistenceProvider := c.Persistence()
session, err := persistenceProvider.Session()
if err != nil {
return errors.Wrap(err, "Obtaining session failed")
}
we.Meta["version"] = c.Version()
return errors.Wrap(session.WriteEvent(we.Stream, we.Payload, we.Meta), fmt.Sprintf("Error writing event to stream %s", we.Stream))
}
52 changes: 52 additions & 0 deletions actions/write_event_from_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package actions

import (
"encoding/json"
"io"

"github.com/pkg/errors"
)

type WriteEventFromStream struct {
Stream string
Input io.Reader
}

// Read up to Mb from stream
const MAX_READ = 1024 * 1024

func readJSONFromStream(input io.Reader) ([]byte, error) {
inputBuf := make([]byte, MAX_READ)
_, err := input.Read(inputBuf)
if err != nil {
return inputBuf, errors.Wrap(err, "Input read failed")
}
for i, v := range inputBuf {
if v == '\x00' {
inputBuf = inputBuf[:i]
break
}
}
var js map[string]interface{}
err = json.Unmarshal(inputBuf, &js)
if err != nil {
return inputBuf, errors.Wrap(err, "Input is not JSON")
}
out, err := json.Marshal(js)
if err != nil {
return inputBuf, errors.Wrap(err, "Marshalling as JSON failed")
}
return out, nil
}

func (wes WriteEventFromStream) Run(c Context) error {
we := WriteEvent{Stream: wes.Stream, Meta: make(map[string]string)}
we.Meta["origin"] = "stream"
we.Meta["compressed"] = "false"
var err error
we.Payload, err = readJSONFromStream(wes.Input)
if err != nil {
return errors.Wrap(err, "Failed reading input")
}
return errors.Wrap(we.Run(c), "Failed running WriteEvent action")
}
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ func (c Config) Persistence() persistence.Provider {
Keyspace: c.keyspace}
return &provider
}

func (c Config) Version() string {
return Version
}
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ func main() {
app.Version = Version
app.Action = func(c *cli.Context) error {
config := Config{keyspace: "archai_test"}
return actions.Migrate{}.Run(config)
action := actions.ReadEvent{Stream: "testing-stream", Amount: 10}
err := action.Run(config)
return err
}

app.Run(os.Args)
Expand Down
29 changes: 29 additions & 0 deletions persistence/event.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,33 @@
package persistence

import (
"fmt"

"github.com/gocql/gocql"
)

type Event struct {
ID string
Stream string
Blob []byte
Meta map[string]string
}

func eventFromRow(row map[string]interface{}) (Event, error) {
var conv bool
ev := Event{}
id, conv := row["id"].(gocql.UUID)
if !conv {
return ev, fmt.Errorf("Failed converting %v to UUID", row["id"])
}
ev.ID = id.String()
ev.Blob, conv = row["blob"].([]byte)
if !conv {
return ev, fmt.Errorf("Failed converting %v to blob", row["blob"])
}
ev.Meta, conv = row["meta"].(map[string]string)
if !conv {
return ev, fmt.Errorf("Failed converting %v to map", row["map"])
}
return ev, nil
}
5 changes: 3 additions & 2 deletions persistence/migration_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"log"

"github.com/gocql/gocql"
"github.com/pkg/errors"
)

type MigrationSession interface {
Expand Down Expand Up @@ -37,13 +38,13 @@ var insertMigration = fmt.Sprintf(`INSERT INTO %s (name) VALUES (?)`, migrationT

func (sess *CassandraMigrationSession) ShouldRunMigration(name string) (bool, error) {
if err := sess.session.Query(createMigrationTable).Exec(); err != nil {
return false, err
return false, errors.Wrap(err, "Query to createMigrationTable failed")
}
log.Println("Looking for migration ", name)
iter := sess.session.Query(findMigration, name).Iter()
found := iter.Scan(nil)
err := iter.Close()
return !found, err
return !found, errors.Wrap(err, "Closing iterator for findMigration failed")
}

func (sess *CassandraMigrationSession) DidRunMigration(name string) error {
Expand Down
4 changes: 3 additions & 1 deletion persistence/migrations.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package persistence

import "github.com/pkg/errors"

type Migration interface {
Run(MigrationSession) error
}
Expand All @@ -10,7 +12,7 @@ type SimpleMigration struct {

func (simpleMigration SimpleMigration) Run(session MigrationSession) error {
err := session.Query(simpleMigration.Query)
return err
return errors.Wrap(err, "Query to execute SimpleMigration failed")
}

var CreateEventsTable = SimpleMigration{Query: `
Expand Down
9 changes: 5 additions & 4 deletions persistence/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/gocql/gocql"
"github.com/pkg/errors"
)

type Provider interface {
Expand All @@ -25,22 +26,22 @@ func (cp *CassandraProvider) Session() (Session, error) {
cluster.Keyspace = cp.Keyspace
cluster.Consistency = gocql.Quorum
sess, err := cluster.CreateSession()
return &CassandraSession{session: sess}, err
return &CassandraSession{session: sess}, errors.Wrap(err, "CreateSession failed")
}

func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) {
cluster := cp.newCluster()
cluster.Consistency = gocql.All
sess, err := cluster.CreateSession()
if err != nil {
return &CassandraMigrationSession{}, err
return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed")
}
err = sess.Query(fmt.Sprintf(`CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`, cp.Keyspace)).Exec()
if err != nil {
return &CassandraMigrationSession{}, err
return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed")
}
cluster.Keyspace = cp.Keyspace
sess, err = cluster.CreateSession()

return &CassandraMigrationSession{session: sess}, err
return &CassandraMigrationSession{session: sess}, errors.Wrap(err, "CreateSession failed")
}
23 changes: 18 additions & 5 deletions persistence/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package persistence

import (
"github.com/gocql/gocql"
"github.com/pkg/errors"
)

type Session interface {
WriteEvent(string, []byte, map[string]string) error
ReadEvents(string, string, int) []Event
ReadEvents(string, string, int) ([]Event, error)
}

type CassandraSession struct {
Expand All @@ -16,10 +17,22 @@ type CassandraSession struct {
const insertEvent = `INSERT INTO events (id, stream, blob, meta) VALUES (now(), ?, ?, ?)`

func (sess *CassandraSession) WriteEvent(stream string, blob []byte, meta map[string]string) error {
return sess.session.Query(insertEvent, stream, blob, meta).Exec()
return errors.Wrap(sess.session.Query(insertEvent, stream, blob, meta).Exec(), "Error writing event to Cassandra")
}

func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) []Event {
events := make([]Event, 0)
return events
const readEvent = `SELECT id, blob, meta FROM events WHERE stream = ? AND id > ? LIMIT ?`

func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount int) ([]Event, error) {
iter := sess.session.Query(readEvent, stream, cursor, amount).Iter()
rows, err := iter.SliceMap()
events := make([]Event, len(rows))
for i, r := range rows {
events[i], err = eventFromRow(r)
if err != nil {
return events, errors.Wrap(err, "Conversion to Event failed")
}
events[i].Stream = stream
}
err = iter.Close()
return events, errors.Wrap(err, "Failed readEvent")
}

0 comments on commit f078a46

Please sign in to comment.