From a7cf80b341188bee70c9b9e5099d08f0324919c9 Mon Sep 17 00:00:00 2001 From: scoiatael Date: Thu, 9 Mar 2017 14:17:13 +0100 Subject: [PATCH] List streams --- actions/http_server.go | 13 +++++++++++++ actions/list_streams.go | 26 ++++++++++++++++++++++++++ main.go | 7 +++++++ persistence/session.go | 14 ++++++++++++++ 4 files changed, 60 insertions(+) create mode 100644 actions/list_streams.go diff --git a/actions/http_server.go b/actions/http_server.go index b24a0f6..cc608ee 100644 --- a/actions/http_server.go +++ b/actions/http_server.go @@ -113,6 +113,19 @@ func (hs HttpServer) Run(c Context) error { handler.Get("/_check", func(ctx http.GetContext) { ctx.SendJson("OK") }) + handler.Get("/streams", func(ctx http.GetContext) { + session, err := c.Persistence().Session() + err = errors.Wrap(err, "Obtaining session failed") + if err != nil { + c.HandleErr(err) + ctx.ServerErr(err) + return + } + streams, err := session.ListStreams() + view := make(simplejson.Object) + view["streams"] = streams + ctx.SendJson(view) + }) handler.Get("/stream/:id", func(ctx http.GetContext) { stream := ctx.GetSegment("id") action := ReadEvents{Stream: stream} diff --git a/actions/list_streams.go b/actions/list_streams.go new file mode 100644 index 0000000..ab9e7ab --- /dev/null +++ b/actions/list_streams.go @@ -0,0 +1,26 @@ +package actions + +import ( + "fmt" + + "github.com/pkg/errors" +) + +type ListStreams struct { +} + +func (re ListStreams) Run(c Context) error { + session, err := c.Persistence().Session() + if err != nil { + return errors.Wrap(err, "Obtaining session failed") + } + streams, err := session.ListStreams() + for _, s := range streams { + println(s) + } + return errors.Wrap(err, fmt.Sprintf("Error listing streams")) +} + +func (re ListStreams) MarshalJSON() ([]byte, error) { + return []byte(`"List streams"`), nil +} diff --git a/main.go b/main.go index c78860f..5fa7c4c 100644 --- a/main.go +++ b/main.go @@ -52,6 +52,10 @@ func main() { Value: "", Usage: "Cassandra keyspace replication options", }, + cli.BoolFlag{ + Name: "list-streams", + Usage: "List streams in Cassandra", + }, } app.Action = func(c *cli.Context) error { config := Config{} @@ -64,6 +68,9 @@ func main() { if c.Bool("migrate") { config.Append(actions.Migrate{}) } + if c.Bool("list-streams") { + config.Append(actions.ListStreams{}) + } if c.Bool("dev-logger") { config.Features["dev_logger"] = true } diff --git a/persistence/session.go b/persistence/session.go index 1b89393..1b19557 100644 --- a/persistence/session.go +++ b/persistence/session.go @@ -11,6 +11,7 @@ import ( type Session interface { WriteEvent(string, []byte, map[string]string) error ReadEvents(string, string, int) ([]types.Event, error) + ListStreams() ([]string, error) Close() } @@ -59,3 +60,16 @@ func (sess *CassandraSession) ReadEvents(stream string, cursor string, amount in err = iter.Close() return events, errors.Wrap(err, "Failed readEvent") } + +const listStreams = `SELECT DISTINCT stream FROM events` + +func (sess *CassandraSession) ListStreams() ([]string, error) { + iter := sess.Query(listStreams).Iter() + rows, err := iter.SliceMap() + streams := make([]string, len(rows)) + for i, r := range rows { + streams[i] = r["stream"].(string) + } + err = iter.Close() + return streams, errors.Wrap(err, "Failed listStreams") +}