Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Read all events from stream
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Mar 1, 2017
1 parent aa81669 commit 6da8734
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 77 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*.so
archai

*.coverprofile

# Folders
_obj
_test
Expand Down
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,4 @@ siege:
bash scripts/siege.sh

test:
go test
cd persistence && go test
ginkgo -r --randomizeAllSpecs --randomizeSuites --failOnPending --cover --trace --race --progress
10 changes: 5 additions & 5 deletions actions/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/pkg/errors"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
)

type HttpServer struct {
Expand All @@ -25,13 +26,12 @@ func (hs HttpServer) Run(c Context) error {
c.HandleErr(err)
ctx.ServerErr(err)
} else {
root := make(map[string]interface{})
events := make([]map[string]interface{}, len(action.Events))
root := make(simplejson.Object)
events := make(simplejson.ObjectArray, len(action.Events))
for i, ev := range action.Events {
events[i] = make(map[string]interface{})
events[i] = make(simplejson.Object)
events[i]["ID"] = ev.ID
payload := make(map[string]interface{})
err := json.Unmarshal(ev.Blob, &payload)
payload, err := simplejson.Read(ev.Blob)
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
Expand Down
47 changes: 31 additions & 16 deletions actions/read_events_to_stream.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package actions

import (
"fmt"
"io"
"bufio"

"github.com/scoiatael/archai/types"
)
Expand All @@ -11,29 +10,45 @@ type ReadEventsToStream struct {
Stream string
Cursor string

Output io.Writer
Output bufio.Writer
}

func printEventToStream(out io.Writer, ev types.Event) error {
js := string(ev.Blob)
str := fmt.Sprintf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js)
_, err := out.Write([]byte(str))
func printEventToStream(out bufio.Writer, ev types.Event) error {
buf, err := types.EventToJson(ev)
if err != nil {
return err
}
_, err = out.Write(buf)
if err != nil {
return err
}
_, err = out.WriteRune('\n')
if err != nil {
return err
}
err = out.Flush()
return err
}

func (res ReadEventsToStream) Run(c Context) error {
action := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10}
err := action.Run(c)
events := action.Events
if err != nil {
return err
}
res.Output.Write([]byte(fmt.Sprintln("STREAM -- ID -- Meta -- Blob")))
for _, ev := range events {
err := printEventToStream(res.Output, ev)
size := 10
cursor := res.Cursor
for {
action := ReadEvents{Stream: res.Stream, Cursor: cursor, Amount: size}
err := action.Run(c)
events := action.Events
if err != nil {
return err
}
for _, ev := range events {
err := printEventToStream(res.Output, ev)
if err != nil {
return err
}
}
if len(events) < size {
break
}
}
return nil
}
47 changes: 19 additions & 28 deletions actions/write_event_from_stream.go
Original file line number Diff line number Diff line change
@@ -1,41 +1,24 @@
package actions

import (
"encoding/json"
"bufio"
"io"

"github.com/pkg/errors"
"github.com/scoiatael/archai/simplejson"
)

type WriteEventFromStream struct {
Stream string
Input io.Reader
Input bufio.Reader
}

// Read up to Mb from stream
const MAX_READ = 1024 * 1024

func readJSONFromStream(input io.Reader) ([]byte, error) {
inputBuf := make([]byte, MAX_READ)
_, err := input.Read(inputBuf)
if err != nil {
return inputBuf, errors.Wrap(err, "Input read failed")
}
for i, v := range inputBuf {
if v == '\x00' {
inputBuf = inputBuf[:i]
break
}
}
var js map[string]interface{}
err = json.Unmarshal(inputBuf, &js)
func readJSONFromStream(input bufio.Reader) ([]byte, error) {
buf, err := input.ReadBytes('\n')
if err != nil {
return inputBuf, errors.Wrap(err, "Input is not JSON")
}
out, err := json.Marshal(js)
if err != nil {
return inputBuf, errors.Wrap(err, "Marshalling as JSON failed")
return buf, err
}
out, err := simplejson.Validate(buf)
return out, nil
}

Expand All @@ -44,9 +27,17 @@ func (wes WriteEventFromStream) Run(c Context) error {
we.Meta["origin"] = "stream"
we.Meta["compressed"] = "false"
var err error
we.Payload, err = readJSONFromStream(wes.Input)
if err != nil {
return errors.Wrap(err, "Failed reading input")
for {
we.Payload, err = readJSONFromStream(wes.Input)
if err == io.EOF {
return nil
}
if err != nil {
return errors.Wrap(err, "Failed reading input")
}
err = errors.Wrap(we.Run(c), "Failed running WriteEvent action")
if err != nil {
return err
}
}
return errors.Wrap(we.Run(c), "Failed running WriteEvent action")
}
7 changes: 4 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bufio"
"os"

"github.com/scoiatael/archai/actions"
Expand All @@ -13,11 +14,11 @@ func main() {
app.Usage = "eventstore replacement"
app.Version = Version
app.Action = func(c *cli.Context) error {
config := Config{Keyspace: "archai_test", Hosts: []string{"127.0.0.1"}}
config := Config{Keyspace: "archai_test3", Hosts: []string{"127.0.0.1"}}
config.Actions = []actions.Action{
actions.Migrate{},
//actions.WriteEventFromStream{Stream: "test-stream", Input: os.Stdin},
actions.ReadEventsToStream{Stream: "test-stream", Output: os.Stdout},
// actions.WriteEventFromStream{Stream: "test-stream", Input: os.Stdin},
actions.ReadEventsToStream{Stream: "test-stream", Output: *bufio.NewWriter(os.Stdout)},
actions.HttpServer{Port: 8080, Addr: "127.0.0.1"},
}
return config.Run()
Expand Down
20 changes: 3 additions & 17 deletions persistence/persistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,8 @@ var _ = AfterSuite(func() {

var _ = Describe("Persistence", func() {
Describe("MigrationSession", func() {
It("creates keyspace", func() {
var (
exists bool
err error
)
exists, err = testingKeyspaceExists()
Expect(err).NotTo(HaveOccurred())
if exists {
Skip("Testing keyspace already exists; drop it to run this test")
}

sess, err := provider.MigrationSession()
Expect(err).NotTo(HaveOccurred())
defer sess.Close()

exists, err = testingKeyspaceExists()
It("ensures keyspace is created", func() {
exists, err := testingKeyspaceExists()
Expect(err).NotTo(HaveOccurred())
Expect(exists).To(BeTrue())

Expand Down Expand Up @@ -228,7 +214,7 @@ var _ = Describe("Persistence", func() {
err = sess.WriteEvent(stream, blob, make(map[string]string))
Expect(err).NotTo(HaveOccurred())
})
}, 1)
}, 10)
})
})
})
23 changes: 17 additions & 6 deletions persistence/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,26 +40,37 @@ func (cp *CassandraProvider) Session() (Session, error) {

const createKeySpace = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };`

func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) {
cluster := cp.NewCluster()
func (c *CassandraProvider) createKeySpace() error {
cluster := c.NewCluster()
cluster.Timeout = 2000 * time.Millisecond
cluster.Consistency = gocql.All
sess, err := cluster.CreateSession()
if err != nil {
return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed")
return errors.Wrap(err, "CreateSession failed")
}
defer sess.Close()
err = sess.Query(fmt.Sprintf(createKeySpace, cp.Keyspace)).Exec()
err = sess.Query(fmt.Sprintf(createKeySpace, c.Keyspace)).Exec()
if err != nil {
return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed")
return errors.Wrap(err, "Query to CreateKeyspace failed")
}
return nil
}

func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) {
cluster := cp.NewCluster()
cluster.Timeout = 2000 * time.Millisecond
cluster.Consistency = gocql.All
cluster.Keyspace = cp.Keyspace
sess, err = cluster.CreateSession()
sess, err := cluster.CreateSession()

return &CassandraMigrationSession{sess}, errors.Wrap(err, "CreateSession failed")
}

func (c *CassandraProvider) Init() error {
err := c.createKeySpace()
if err != nil {
return err
}
new_sess, err := c.newSession()
if err == nil {
c.session = &new_sess
Expand Down
36 changes: 36 additions & 0 deletions simplejson/json.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package simplejson

import (
"encoding/json"

"github.com/pkg/errors"
)

type Json interface{}
type Object map[string]Json
type String string
type Array []Json
type ObjectArray []Object

func Read(buf []byte) (Object, error) {
var js Object

err := json.Unmarshal(buf, &js)
return js, errors.Wrap(err, "Input is not JSON")
}

func Write(js Object) ([]byte, error) {
out, err := json.Marshal(js)
return out, errors.Wrap(err, "Marshalling as JSON failed")
}

func Validate(buf []byte) ([]byte, error) {
var (
err error
)
js, err := Read(buf)
if err != nil {
return buf, err
}
return Write(js)
}
17 changes: 17 additions & 0 deletions types/event.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,25 @@
package types

import (
"github.com/scoiatael/archai/simplejson"
)

type Event struct {
ID string
Stream string
Blob []byte
Meta map[string]string
}

func EventToJson(e Event) ([]byte, error) {
object := make(simplejson.Object)
object["id"] = e.ID
object["stream"] = e.Stream
payload, err := simplejson.Read(e.Blob)
if err != nil {
return []byte{}, err
}
object["payload"] = payload
return simplejson.Write(object)

}

0 comments on commit 6da8734

Please sign in to comment.