diff --git a/actions/context.go b/actions/context.go index e717caa..a3d88c8 100644 --- a/actions/context.go +++ b/actions/context.go @@ -1,13 +1,22 @@ package actions import ( + "github.com/scoiatael/archai/http" "github.com/scoiatael/archai/persistence" ) +type HttpHandler interface { + Get(string, func(http.GetContext)) + Post(string, func(http.PostContext)) + Run(string) error +} + type Context interface { Persistence() persistence.Provider Migrations() map[string]persistence.Migration Version() string + HandleErr(error) + HttpHandler() HttpHandler } type Action interface { diff --git a/actions/http_server.go b/actions/http_server.go new file mode 100644 index 0000000..b16d3e2 --- /dev/null +++ b/actions/http_server.go @@ -0,0 +1,74 @@ +package actions + +import ( + "fmt" + + "encoding/json" + + "log" + + "github.com/pkg/errors" + "github.com/scoiatael/archai/http" +) + +type HttpServer struct { + Addr string + Port int +} + +func (hs HttpServer) Run(c Context) error { + handler := c.HttpHandler() + handler.Get("/stream/:id", func(ctx http.GetContext) { + stream := ctx.GetSegment(":id") + action := ReadEvents{Stream: stream} + action.Cursor = ctx.StringParam("stream") + action.Amount = ctx.IntParam("amount", 10) + err := action.Run(c) + if err != nil { + c.HandleErr(err) + ctx.ServerErr(err) + } else { + events := make([]map[string]interface{}, len(action.Events)) + for i, ev := range action.Events { + events[i] = make(map[string]interface{}) + events[i]["ID"] = ev.ID + payload := make(map[string]interface{}) + err := json.Unmarshal(ev.Blob, &payload) + if err != nil { + c.HandleErr(err) + ctx.ServerErr(err) + } + events[i]["blob"] = payload + } + ctx.SendJson(events) + } + }) + handler.Post("/stream/:id", func(ctx http.PostContext) { + var err error + stream := ctx.GetSegment(":id") + body, err := ctx.JsonBodyParams() + if err != nil { + // Error was already send + return + } + log.Println("Got ", body) + payload, err := json.Marshal(body) + if err != nil { + c.HandleErr(err) + ctx.ServerErr(err) + return + } + action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)} + action.Meta["origin"] = "http" + action.Meta["compressed"] = "false" + err = action.Run(c) + if err != nil { + c.HandleErr(err) + ctx.ServerErr(err) + } else { + ctx.SendJson("OK") + } + }) + connString := fmt.Sprintf("%s:%d", hs.Addr, hs.Port) + return errors.Wrap(handler.Run(connString), "HttpServer starting..") +} diff --git a/actions/read_events.go b/actions/read_events.go index 9fc54c0..c16d2cf 100644 --- a/actions/read_events.go +++ b/actions/read_events.go @@ -12,12 +12,12 @@ type ReadEvents struct { Cursor string Amount int - Output chan (types.Event) + Events []types.Event } const minTimeuuid = "00000000-0000-1000-8080-808080808080" -func (re ReadEvents) Run(c Context) error { +func (re *ReadEvents) Run(c Context) error { persistenceProvider := c.Persistence() session, err := persistenceProvider.Session() if err != nil { @@ -28,8 +28,6 @@ func (re ReadEvents) Run(c Context) error { re.Cursor = minTimeuuid } events, err := session.ReadEvents(re.Stream, re.Cursor, re.Amount) - for _, ev := range events { - re.Output <- ev - } + re.Events = events return errors.Wrap(err, fmt.Sprintf("Error reading event from stream %s", re.Stream)) } diff --git a/actions/read_events_to_stream.go b/actions/read_events_to_stream.go index 7c193f7..1055dab 100644 --- a/actions/read_events_to_stream.go +++ b/actions/read_events_to_stream.go @@ -10,7 +10,6 @@ import ( type ReadEventsToStream struct { Stream string Cursor string - Amount int Output io.Writer } @@ -23,12 +22,14 @@ func printEventToStream(out io.Writer, ev types.Event) error { } func (res ReadEventsToStream) Run(c Context) error { - ch := make(chan (types.Event), 100) - err := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10, Output: ch}.Run(c) + action := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10} + err := action.Run(c) + events := action.Events if err != nil { return err } - for ev := range ch { + res.Output.Write([]byte(fmt.Sprintln("STREAM -- ID -- Meta -- Blob"))) + for _, ev := range events { err := printEventToStream(res.Output, ev) if err != nil { return err diff --git a/config.go b/config.go index d60c31c..3f27f0b 100644 --- a/config.go +++ b/config.go @@ -1,7 +1,10 @@ package main import ( + "log" + "github.com/scoiatael/archai/actions" + "github.com/scoiatael/archai/http" "github.com/scoiatael/archai/persistence" ) @@ -12,6 +15,11 @@ type Config struct { Actions []actions.Action } +func (c Config) HandleErr(err error) { + log.Print(err) + panic(err) +} + func (c Config) Migrations() map[string]persistence.Migration { m := make(map[string]persistence.Migration) m["create_events_table"] = persistence.CreateEventsTable @@ -27,3 +35,17 @@ func (c Config) Persistence() persistence.Provider { func (c Config) Version() string { return Version } + +func (c Config) HttpHandler() actions.HttpHandler { + return &http.FastHttpHandler{Context: c} +} + +func (c Config) Run() error { + for _, a := range c.Actions { + err := a.Run(c) + if err != nil { + return err + } + } + return nil +} diff --git a/http/context.go b/http/context.go new file mode 100644 index 0000000..4d0e525 --- /dev/null +++ b/http/context.go @@ -0,0 +1,22 @@ +package http + +type Context interface { + HandleErr(error) +} + +type HttpContext interface { + SendJson(interface{}) + GetSegment(string) string + ServerErr(error) +} + +type GetContext interface { + HttpContext + StringParam(string) string + IntParam(string, int) int +} + +type PostContext interface { + HttpContext + JsonBodyParams() (map[string]interface{}, error) +} diff --git a/http/fast_http.go b/http/fast_http.go new file mode 100644 index 0000000..8a3aee6 --- /dev/null +++ b/http/fast_http.go @@ -0,0 +1,119 @@ +package http + +import ( + "encoding/json" + + "fmt" + + "strings" + + "strconv" + + "github.com/pkg/errors" + "github.com/valyala/fasthttp" +) + +type FastHttpContext struct { + *fasthttp.RequestCtx + Context Context +} + +func (hc FastHttpContext) ServerErr(err error) { + hc.Context.HandleErr(err) + hc.Error(fmt.Sprintf(`{ "error": "%v" }`, err), fasthttp.StatusInternalServerError) +} + +func (hc FastHttpContext) SendJson(response interface{}) { + dump, err := json.Marshal(response) + if err != nil { + hc.ServerErr(err) + } else { + hc.SetBody(dump) + } +} + +// TODO: Do this normal way +func (hc FastHttpContext) GetSegment(index string) string { + segments := strings.Split(string(hc.Path()), "/") + if len(segments) == 0 { + return "" + } else { + return segments[len(segments)-1] + } +} + +type FastHttpGetContext struct { + FastHttpContext +} + +func (gc FastHttpGetContext) StringParam(name string) string { + val := gc.QueryArgs().Peek(name) + return string(val) +} + +func (gc FastHttpGetContext) IntParam(name string, def int) int { + val := gc.QueryArgs().Peek(name) + i, err := strconv.Atoi(string(val)) + if err != nil { + return def + } + return i +} + +type FastHttpPostContext struct { + FastHttpContext +} + +const expectedJSON = `{ "error": "expected JSON body" }` + +func (pc FastHttpPostContext) JsonBodyParams() (map[string]interface{}, error) { + body := pc.PostBody() + read := make(map[string]interface{}) + err := json.Unmarshal(body, &read) + if err != nil { + pc.Error(expectedJSON, fasthttp.StatusBadRequest) + } + return read, err +} + +// TODO: Add routing ;) +type FastHttpHandlers struct { + POST func(PostContext) + GET func(GetContext) +} + +type FastHttpHandler struct { + handlers FastHttpHandlers + Context Context +} + +func (h *FastHttpHandler) Get(path string, handler func(GetContext)) { + h.handlers.GET = handler +} + +func (h *FastHttpHandler) Post(path string, handler func(PostContext)) { + h.handlers.POST = handler +} + +const ( + methodNotAllowed = `{ "error": "method not allowed" }` + contentType = `application/json` +) + +func (h *FastHttpHandler) compile() func(*fasthttp.RequestCtx) { + return func(ctx *fasthttp.RequestCtx) { + ctx.SetContentType(contentType) + httpCtx := FastHttpContext{ctx, h.Context} + if ctx.IsPost() { + h.handlers.POST(FastHttpPostContext{httpCtx}) + } else if ctx.IsGet() { + h.handlers.GET(FastHttpGetContext{httpCtx}) + } else { + ctx.Error(methodNotAllowed, fasthttp.StatusMethodNotAllowed) + } + } +} + +func (h *FastHttpHandler) Run(addr string) error { + return errors.Wrap(fasthttp.ListenAndServe(addr, h.compile()), "Starting fasthttp server") +} diff --git a/main.go b/main.go index fedbb01..b009432 100644 --- a/main.go +++ b/main.go @@ -14,9 +14,12 @@ func main() { app.Version = Version app.Action = func(c *cli.Context) error { config := Config{Keyspace: "archai_test", Hosts: []string{"127.0.0.1"}} - action := actions.ReadEventsToStream{Stream: "testing-stream", Output: os.Stdout} - err := action.Run(config) - return err + config.Actions = []actions.Action{ + //actions.WriteEventFromStream{Stream: "test-stream", Input: os.Stdin}, + actions.ReadEventsToStream{Stream: "test-stream", Output: os.Stdout}, + actions.HttpServer{Port: 8080}, + } + return config.Run() } app.Run(os.Args) diff --git a/persistence/provider.go b/persistence/provider.go index dd301ce..0e6b9f7 100644 --- a/persistence/provider.go +++ b/persistence/provider.go @@ -38,6 +38,7 @@ func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) { if err != nil { return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed") } + defer sess.Close() err = sess.Query(fmt.Sprintf(createKeySpace, cp.Keyspace)).Exec() if err != nil { return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed")