From 88313cd52f2d9207488290a1546948669e3f63b8 Mon Sep 17 00:00:00 2001 From: lczaplinski Date: Thu, 16 Feb 2017 19:58:07 +0100 Subject: [PATCH] Better reader structure --- actions/{read_event.go => read_events.go} | 10 +++--- actions/read_events_to_stream.go | 38 +++++++++++++++++++++++ main.go | 2 +- 3 files changed, 45 insertions(+), 5 deletions(-) rename actions/{read_event.go => read_events.go} (77%) create mode 100644 actions/read_events_to_stream.go diff --git a/actions/read_event.go b/actions/read_events.go similarity index 77% rename from actions/read_event.go rename to actions/read_events.go index 487f25f..7a3c284 100644 --- a/actions/read_event.go +++ b/actions/read_events.go @@ -4,17 +4,20 @@ import ( "fmt" "github.com/pkg/errors" + "github.com/scoiatael/archai/persistence" ) -type ReadEvent struct { +type ReadEvents struct { Stream string Cursor string Amount int + + Output chan (persistence.Event) } const minTimeuuid = "00000000-0000-1000-8080-808080808080" -func (re ReadEvent) Run(c Context) error { +func (re ReadEvents) Run(c Context) error { persistenceProvider := c.Persistence() session, err := persistenceProvider.Session() if err != nil { @@ -25,8 +28,7 @@ func (re ReadEvent) Run(c Context) error { } events, err := session.ReadEvents(re.Stream, re.Cursor, re.Amount) for _, ev := range events { - js := string(ev.Blob) - fmt.Printf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js) + re.Output <- ev } return errors.Wrap(err, fmt.Sprintf("Error reading event from stream %s", re.Stream)) } diff --git a/actions/read_events_to_stream.go b/actions/read_events_to_stream.go new file mode 100644 index 0000000..8ef75ca --- /dev/null +++ b/actions/read_events_to_stream.go @@ -0,0 +1,38 @@ +package actions + +import ( + "fmt" + "io" + + "github.com/scoiatael/archai/persistence" +) + +type ReadEventsToStream struct { + Stream string + Cursor string + Amount int + + Output io.Writer +} + +func printEventToStream(out io.Writer, ev persistence.Event) error { + js := string(ev.Blob) + str := fmt.Sprintf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js) + _, err := out.Write([]byte(str)) + return err +} + +func (res ReadEventsToStream) Run(c Context) error { + ch := make(chan (persistence.Event), 100) + err := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10, Output: ch}.Run(c) + if err != nil { + return err + } + for ev := range ch { + err := printEventToStream(res.Output, ev) + if err != nil { + return err + } + } + return nil +} diff --git a/main.go b/main.go index b6b29da..33a4212 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ func main() { app.Version = Version app.Action = func(c *cli.Context) error { config := Config{keyspace: "archai_test"} - action := actions.ReadEvent{Stream: "testing-stream", Amount: 10} + action := actions.ReadEventsToStream{Stream: "testing-stream", Output: os.Stdout} err := action.Run(config) return err }