From 6da873490f7eb87643e3c5dc81b5accf15a07ce1 Mon Sep 17 00:00:00 2001 From: scoiatael Date: Wed, 1 Mar 2017 18:03:48 +0100 Subject: [PATCH] Read all events from stream --- .gitignore | 2 ++ Makefile | 3 +- actions/http_server.go | 10 +++---- actions/read_events_to_stream.go | 47 ++++++++++++++++++++---------- actions/write_event_from_stream.go | 47 ++++++++++++------------------ main.go | 7 +++-- persistence/persistence_test.go | 20 ++----------- persistence/provider.go | 23 +++++++++++---- simplejson/json.go | 36 +++++++++++++++++++++++ types/event.go | 17 +++++++++++ 10 files changed, 135 insertions(+), 77 deletions(-) create mode 100644 simplejson/json.go diff --git a/.gitignore b/.gitignore index 490aac4..bd126cc 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,8 @@ *.so archai +*.coverprofile + # Folders _obj _test diff --git a/Makefile b/Makefile index 7d368ea..05900af 100644 --- a/Makefile +++ b/Makefile @@ -9,5 +9,4 @@ siege: bash scripts/siege.sh test: - go test - cd persistence && go test + ginkgo -r --randomizeAllSpecs --randomizeSuites --failOnPending --cover --trace --race --progress diff --git a/actions/http_server.go b/actions/http_server.go index be07d99..4769873 100644 --- a/actions/http_server.go +++ b/actions/http_server.go @@ -6,6 +6,7 @@ import ( "github.com/pkg/errors" "github.com/scoiatael/archai/http" + "github.com/scoiatael/archai/simplejson" ) type HttpServer struct { @@ -25,13 +26,12 @@ func (hs HttpServer) Run(c Context) error { c.HandleErr(err) ctx.ServerErr(err) } else { - root := make(map[string]interface{}) - events := make([]map[string]interface{}, len(action.Events)) + root := make(simplejson.Object) + events := make(simplejson.ObjectArray, len(action.Events)) for i, ev := range action.Events { - events[i] = make(map[string]interface{}) + events[i] = make(simplejson.Object) events[i]["ID"] = ev.ID - payload := make(map[string]interface{}) - err := json.Unmarshal(ev.Blob, &payload) + payload, err := simplejson.Read(ev.Blob) if err != nil { c.HandleErr(err) ctx.ServerErr(err) diff --git a/actions/read_events_to_stream.go b/actions/read_events_to_stream.go index 1055dab..42beb01 100644 --- a/actions/read_events_to_stream.go +++ b/actions/read_events_to_stream.go @@ -1,8 +1,7 @@ package actions import ( - "fmt" - "io" + "bufio" "github.com/scoiatael/archai/types" ) @@ -11,29 +10,45 @@ type ReadEventsToStream struct { Stream string Cursor string - Output io.Writer + Output bufio.Writer } -func printEventToStream(out io.Writer, ev types.Event) error { - js := string(ev.Blob) - str := fmt.Sprintf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js) - _, err := out.Write([]byte(str)) +func printEventToStream(out bufio.Writer, ev types.Event) error { + buf, err := types.EventToJson(ev) + if err != nil { + return err + } + _, err = out.Write(buf) + if err != nil { + return err + } + _, err = out.WriteRune('\n') + if err != nil { + return err + } + err = out.Flush() return err } func (res ReadEventsToStream) Run(c Context) error { - action := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10} - err := action.Run(c) - events := action.Events - if err != nil { - return err - } - res.Output.Write([]byte(fmt.Sprintln("STREAM -- ID -- Meta -- Blob"))) - for _, ev := range events { - err := printEventToStream(res.Output, ev) + size := 10 + cursor := res.Cursor + for { + action := ReadEvents{Stream: res.Stream, Cursor: cursor, Amount: size} + err := action.Run(c) + events := action.Events if err != nil { return err } + for _, ev := range events { + err := printEventToStream(res.Output, ev) + if err != nil { + return err + } + } + if len(events) < size { + break + } } return nil } diff --git a/actions/write_event_from_stream.go b/actions/write_event_from_stream.go index b23838e..0b847bf 100644 --- a/actions/write_event_from_stream.go +++ b/actions/write_event_from_stream.go @@ -1,41 +1,24 @@ package actions import ( - "encoding/json" + "bufio" "io" "github.com/pkg/errors" + "github.com/scoiatael/archai/simplejson" ) type WriteEventFromStream struct { Stream string - Input io.Reader + Input bufio.Reader } -// Read up to Mb from stream -const MAX_READ = 1024 * 1024 - -func readJSONFromStream(input io.Reader) ([]byte, error) { - inputBuf := make([]byte, MAX_READ) - _, err := input.Read(inputBuf) - if err != nil { - return inputBuf, errors.Wrap(err, "Input read failed") - } - for i, v := range inputBuf { - if v == '\x00' { - inputBuf = inputBuf[:i] - break - } - } - var js map[string]interface{} - err = json.Unmarshal(inputBuf, &js) +func readJSONFromStream(input bufio.Reader) ([]byte, error) { + buf, err := input.ReadBytes('\n') if err != nil { - return inputBuf, errors.Wrap(err, "Input is not JSON") - } - out, err := json.Marshal(js) - if err != nil { - return inputBuf, errors.Wrap(err, "Marshalling as JSON failed") + return buf, err } + out, err := simplejson.Validate(buf) return out, nil } @@ -44,9 +27,17 @@ func (wes WriteEventFromStream) Run(c Context) error { we.Meta["origin"] = "stream" we.Meta["compressed"] = "false" var err error - we.Payload, err = readJSONFromStream(wes.Input) - if err != nil { - return errors.Wrap(err, "Failed reading input") + for { + we.Payload, err = readJSONFromStream(wes.Input) + if err == io.EOF { + return nil + } + if err != nil { + return errors.Wrap(err, "Failed reading input") + } + err = errors.Wrap(we.Run(c), "Failed running WriteEvent action") + if err != nil { + return err + } } - return errors.Wrap(we.Run(c), "Failed running WriteEvent action") } diff --git a/main.go b/main.go index 49d80b1..b66dcb2 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "bufio" "os" "github.com/scoiatael/archai/actions" @@ -13,11 +14,11 @@ func main() { app.Usage = "eventstore replacement" app.Version = Version app.Action = func(c *cli.Context) error { - config := Config{Keyspace: "archai_test", Hosts: []string{"127.0.0.1"}} + config := Config{Keyspace: "archai_test3", Hosts: []string{"127.0.0.1"}} config.Actions = []actions.Action{ actions.Migrate{}, - //actions.WriteEventFromStream{Stream: "test-stream", Input: os.Stdin}, - actions.ReadEventsToStream{Stream: "test-stream", Output: os.Stdout}, + // actions.WriteEventFromStream{Stream: "test-stream", Input: os.Stdin}, + actions.ReadEventsToStream{Stream: "test-stream", Output: *bufio.NewWriter(os.Stdout)}, actions.HttpServer{Port: 8080, Addr: "127.0.0.1"}, } return config.Run() diff --git a/persistence/persistence_test.go b/persistence/persistence_test.go index 09ede4b..3543453 100644 --- a/persistence/persistence_test.go +++ b/persistence/persistence_test.go @@ -72,22 +72,8 @@ var _ = AfterSuite(func() { var _ = Describe("Persistence", func() { Describe("MigrationSession", func() { - It("creates keyspace", func() { - var ( - exists bool - err error - ) - exists, err = testingKeyspaceExists() - Expect(err).NotTo(HaveOccurred()) - if exists { - Skip("Testing keyspace already exists; drop it to run this test") - } - - sess, err := provider.MigrationSession() - Expect(err).NotTo(HaveOccurred()) - defer sess.Close() - - exists, err = testingKeyspaceExists() + It("ensures keyspace is created", func() { + exists, err := testingKeyspaceExists() Expect(err).NotTo(HaveOccurred()) Expect(exists).To(BeTrue()) @@ -228,7 +214,7 @@ var _ = Describe("Persistence", func() { err = sess.WriteEvent(stream, blob, make(map[string]string)) Expect(err).NotTo(HaveOccurred()) }) - }, 1) + }, 10) }) }) }) diff --git a/persistence/provider.go b/persistence/provider.go index 4dc32c2..b03fe7f 100644 --- a/persistence/provider.go +++ b/persistence/provider.go @@ -40,26 +40,37 @@ func (cp *CassandraProvider) Session() (Session, error) { const createKeySpace = `CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };` -func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { - cluster := cp.NewCluster() +func (c *CassandraProvider) createKeySpace() error { + cluster := c.NewCluster() cluster.Timeout = 2000 * time.Millisecond cluster.Consistency = gocql.All sess, err := cluster.CreateSession() if err != nil { - return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed") + return errors.Wrap(err, "CreateSession failed") } defer sess.Close() - err = sess.Query(fmt.Sprintf(createKeySpace, cp.Keyspace)).Exec() + err = sess.Query(fmt.Sprintf(createKeySpace, c.Keyspace)).Exec() if err != nil { - return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed") + return errors.Wrap(err, "Query to CreateKeyspace failed") } + return nil +} + +func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { + cluster := cp.NewCluster() + cluster.Timeout = 2000 * time.Millisecond + cluster.Consistency = gocql.All cluster.Keyspace = cp.Keyspace - sess, err = cluster.CreateSession() + sess, err := cluster.CreateSession() return &CassandraMigrationSession{sess}, errors.Wrap(err, "CreateSession failed") } func (c *CassandraProvider) Init() error { + err := c.createKeySpace() + if err != nil { + return err + } new_sess, err := c.newSession() if err == nil { c.session = &new_sess diff --git a/simplejson/json.go b/simplejson/json.go new file mode 100644 index 0000000..cdc4314 --- /dev/null +++ b/simplejson/json.go @@ -0,0 +1,36 @@ +package simplejson + +import ( + "encoding/json" + + "github.com/pkg/errors" +) + +type Json interface{} +type Object map[string]Json +type String string +type Array []Json +type ObjectArray []Object + +func Read(buf []byte) (Object, error) { + var js Object + + err := json.Unmarshal(buf, &js) + return js, errors.Wrap(err, "Input is not JSON") +} + +func Write(js Object) ([]byte, error) { + out, err := json.Marshal(js) + return out, errors.Wrap(err, "Marshalling as JSON failed") +} + +func Validate(buf []byte) ([]byte, error) { + var ( + err error + ) + js, err := Read(buf) + if err != nil { + return buf, err + } + return Write(js) +} diff --git a/types/event.go b/types/event.go index c2e323e..841a3f6 100644 --- a/types/event.go +++ b/types/event.go @@ -1,8 +1,25 @@ package types +import ( + "github.com/scoiatael/archai/simplejson" +) + type Event struct { ID string Stream string Blob []byte Meta map[string]string } + +func EventToJson(e Event) ([]byte, error) { + object := make(simplejson.Object) + object["id"] = e.ID + object["stream"] = e.Stream + payload, err := simplejson.Read(e.Blob) + if err != nil { + return []byte{}, err + } + object["payload"] = payload + return simplejson.Write(object) + +}