Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Better reader structure
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Feb 16, 2017
1 parent f078a46 commit 88313cd
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 5 deletions.
10 changes: 6 additions & 4 deletions actions/read_event.go → actions/read_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@ import (
"fmt"

"github.com/pkg/errors"
"github.com/scoiatael/archai/persistence"
)

type ReadEvent struct {
type ReadEvents struct {
Stream string
Cursor string
Amount int

Output chan (persistence.Event)
}

const minTimeuuid = "00000000-0000-1000-8080-808080808080"

func (re ReadEvent) Run(c Context) error {
func (re ReadEvents) Run(c Context) error {
persistenceProvider := c.Persistence()
session, err := persistenceProvider.Session()
if err != nil {
Expand All @@ -25,8 +28,7 @@ func (re ReadEvent) Run(c Context) error {
}
events, err := session.ReadEvents(re.Stream, re.Cursor, re.Amount)
for _, ev := range events {
js := string(ev.Blob)
fmt.Printf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js)
re.Output <- ev
}
return errors.Wrap(err, fmt.Sprintf("Error reading event from stream %s", re.Stream))
}
38 changes: 38 additions & 0 deletions actions/read_events_to_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package actions

import (
"fmt"
"io"

"github.com/scoiatael/archai/persistence"
)

type ReadEventsToStream struct {
Stream string
Cursor string
Amount int

Output io.Writer
}

func printEventToStream(out io.Writer, ev persistence.Event) error {
js := string(ev.Blob)
str := fmt.Sprintf("%s - %s: {%v} %s\n", ev.Stream, ev.ID, ev.Meta, js)
_, err := out.Write([]byte(str))
return err
}

func (res ReadEventsToStream) Run(c Context) error {
ch := make(chan (persistence.Event), 100)
err := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10, Output: ch}.Run(c)
if err != nil {
return err
}
for ev := range ch {
err := printEventToStream(res.Output, ev)
if err != nil {
return err
}
}
return nil
}
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func main() {
app.Version = Version
app.Action = func(c *cli.Context) error {
config := Config{keyspace: "archai_test"}
action := actions.ReadEvent{Stream: "testing-stream", Amount: 10}
action := actions.ReadEventsToStream{Stream: "testing-stream", Output: os.Stdout}
err := action.Run(config)
return err
}
Expand Down

0 comments on commit 88313cd

Please sign in to comment.