Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Quick implementation of Http server
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Feb 18, 2017
1 parent f98d820 commit e018e88
Show file tree
Hide file tree
Showing 9 changed files with 261 additions and 12 deletions.
9 changes: 9 additions & 0 deletions actions/context.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package actions

import (
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/persistence"
)

type HttpHandler interface {
Get(string, func(http.GetContext))
Post(string, func(http.PostContext))
Run(string) error
}

type Context interface {
Persistence() persistence.Provider
Migrations() map[string]persistence.Migration
Version() string
HandleErr(error)
HttpHandler() HttpHandler
}

type Action interface {
Expand Down
74 changes: 74 additions & 0 deletions actions/http_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package actions

import (
"fmt"

"encoding/json"

"log"

"github.com/pkg/errors"
"github.com/scoiatael/archai/http"
)

type HttpServer struct {
Addr string
Port int
}

func (hs HttpServer) Run(c Context) error {
handler := c.HttpHandler()
handler.Get("/stream/:id", func(ctx http.GetContext) {
stream := ctx.GetSegment(":id")
action := ReadEvents{Stream: stream}
action.Cursor = ctx.StringParam("stream")
action.Amount = ctx.IntParam("amount", 10)
err := action.Run(c)
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
} else {
events := make([]map[string]interface{}, len(action.Events))
for i, ev := range action.Events {
events[i] = make(map[string]interface{})
events[i]["ID"] = ev.ID
payload := make(map[string]interface{})
err := json.Unmarshal(ev.Blob, &payload)
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
}
events[i]["blob"] = payload
}
ctx.SendJson(events)
}
})
handler.Post("/stream/:id", func(ctx http.PostContext) {
var err error
stream := ctx.GetSegment(":id")
body, err := ctx.JsonBodyParams()
if err != nil {
// Error was already send
return
}
log.Println("Got ", body)
payload, err := json.Marshal(body)
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
return
}
action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = "http"
action.Meta["compressed"] = "false"
err = action.Run(c)
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
} else {
ctx.SendJson("OK")
}
})
connString := fmt.Sprintf("%s:%d", hs.Addr, hs.Port)
return errors.Wrap(handler.Run(connString), "HttpServer starting..")
}
8 changes: 3 additions & 5 deletions actions/read_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ type ReadEvents struct {
Cursor string
Amount int

Output chan (types.Event)
Events []types.Event
}

const minTimeuuid = "00000000-0000-1000-8080-808080808080"

func (re ReadEvents) Run(c Context) error {
func (re *ReadEvents) Run(c Context) error {
persistenceProvider := c.Persistence()
session, err := persistenceProvider.Session()
if err != nil {
Expand All @@ -28,8 +28,6 @@ func (re ReadEvents) Run(c Context) error {
re.Cursor = minTimeuuid
}
events, err := session.ReadEvents(re.Stream, re.Cursor, re.Amount)
for _, ev := range events {
re.Output <- ev
}
re.Events = events
return errors.Wrap(err, fmt.Sprintf("Error reading event from stream %s", re.Stream))
}
9 changes: 5 additions & 4 deletions actions/read_events_to_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
type ReadEventsToStream struct {
Stream string
Cursor string
Amount int

Output io.Writer
}
Expand All @@ -23,12 +22,14 @@ func printEventToStream(out io.Writer, ev types.Event) error {
}

func (res ReadEventsToStream) Run(c Context) error {
ch := make(chan (types.Event), 100)
err := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10, Output: ch}.Run(c)
action := ReadEvents{Stream: res.Stream, Cursor: res.Cursor, Amount: 10}
err := action.Run(c)
events := action.Events
if err != nil {
return err
}
for ev := range ch {
res.Output.Write([]byte(fmt.Sprintln("STREAM -- ID -- Meta -- Blob")))
for _, ev := range events {
err := printEventToStream(res.Output, ev)
if err != nil {
return err
Expand Down
22 changes: 22 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"log"

"github.com/scoiatael/archai/actions"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/persistence"
)

Expand All @@ -12,6 +15,11 @@ type Config struct {
Actions []actions.Action
}

func (c Config) HandleErr(err error) {
log.Print(err)
panic(err)
}

func (c Config) Migrations() map[string]persistence.Migration {
m := make(map[string]persistence.Migration)
m["create_events_table"] = persistence.CreateEventsTable
Expand All @@ -27,3 +35,17 @@ func (c Config) Persistence() persistence.Provider {
func (c Config) Version() string {
return Version
}

func (c Config) HttpHandler() actions.HttpHandler {
return &http.FastHttpHandler{Context: c}
}

func (c Config) Run() error {
for _, a := range c.Actions {
err := a.Run(c)
if err != nil {
return err
}
}
return nil
}
22 changes: 22 additions & 0 deletions http/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package http

type Context interface {
HandleErr(error)
}

type HttpContext interface {
SendJson(interface{})
GetSegment(string) string
ServerErr(error)
}

type GetContext interface {
HttpContext
StringParam(string) string
IntParam(string, int) int
}

type PostContext interface {
HttpContext
JsonBodyParams() (map[string]interface{}, error)
}
119 changes: 119 additions & 0 deletions http/fast_http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package http

import (
"encoding/json"

"fmt"

"strings"

"strconv"

"github.com/pkg/errors"
"github.com/valyala/fasthttp"
)

type FastHttpContext struct {
*fasthttp.RequestCtx
Context Context
}

func (hc FastHttpContext) ServerErr(err error) {
hc.Context.HandleErr(err)
hc.Error(fmt.Sprintf(`{ "error": "%v" }`, err), fasthttp.StatusInternalServerError)
}

func (hc FastHttpContext) SendJson(response interface{}) {
dump, err := json.Marshal(response)
if err != nil {
hc.ServerErr(err)
} else {
hc.SetBody(dump)
}
}

// TODO: Do this normal way
func (hc FastHttpContext) GetSegment(index string) string {
segments := strings.Split(string(hc.Path()), "/")
if len(segments) == 0 {
return ""
} else {
return segments[len(segments)-1]
}
}

type FastHttpGetContext struct {
FastHttpContext
}

func (gc FastHttpGetContext) StringParam(name string) string {
val := gc.QueryArgs().Peek(name)
return string(val)
}

func (gc FastHttpGetContext) IntParam(name string, def int) int {
val := gc.QueryArgs().Peek(name)
i, err := strconv.Atoi(string(val))
if err != nil {
return def
}
return i
}

type FastHttpPostContext struct {
FastHttpContext
}

const expectedJSON = `{ "error": "expected JSON body" }`

func (pc FastHttpPostContext) JsonBodyParams() (map[string]interface{}, error) {
body := pc.PostBody()
read := make(map[string]interface{})
err := json.Unmarshal(body, &read)
if err != nil {
pc.Error(expectedJSON, fasthttp.StatusBadRequest)
}
return read, err
}

// TODO: Add routing ;)
type FastHttpHandlers struct {
POST func(PostContext)
GET func(GetContext)
}

type FastHttpHandler struct {
handlers FastHttpHandlers
Context Context
}

func (h *FastHttpHandler) Get(path string, handler func(GetContext)) {
h.handlers.GET = handler
}

func (h *FastHttpHandler) Post(path string, handler func(PostContext)) {
h.handlers.POST = handler
}

const (
methodNotAllowed = `{ "error": "method not allowed" }`
contentType = `application/json`
)

func (h *FastHttpHandler) compile() func(*fasthttp.RequestCtx) {
return func(ctx *fasthttp.RequestCtx) {
ctx.SetContentType(contentType)
httpCtx := FastHttpContext{ctx, h.Context}
if ctx.IsPost() {
h.handlers.POST(FastHttpPostContext{httpCtx})
} else if ctx.IsGet() {
h.handlers.GET(FastHttpGetContext{httpCtx})
} else {
ctx.Error(methodNotAllowed, fasthttp.StatusMethodNotAllowed)
}
}
}

func (h *FastHttpHandler) Run(addr string) error {
return errors.Wrap(fasthttp.ListenAndServe(addr, h.compile()), "Starting fasthttp server")
}
9 changes: 6 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ func main() {
app.Version = Version
app.Action = func(c *cli.Context) error {
config := Config{Keyspace: "archai_test", Hosts: []string{"127.0.0.1"}}
action := actions.ReadEventsToStream{Stream: "testing-stream", Output: os.Stdout}
err := action.Run(config)
return err
config.Actions = []actions.Action{
//actions.WriteEventFromStream{Stream: "test-stream", Input: os.Stdin},
actions.ReadEventsToStream{Stream: "test-stream", Output: os.Stdout},
actions.HttpServer{Port: 8080},
}
return config.Run()
}

app.Run(os.Args)
Expand Down
1 change: 1 addition & 0 deletions persistence/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func (cp *CassandraProvider) MigrationSession() (MigrationSession, error) {
if err != nil {
return &CassandraMigrationSession{}, errors.Wrap(err, "CreateSession failed")
}
defer sess.Close()
err = sess.Query(fmt.Sprintf(createKeySpace, cp.Keyspace)).Exec()
if err != nil {
return &CassandraMigrationSession{}, errors.Wrap(err, "Query to CreateKeyspace failed")
Expand Down

0 comments on commit e018e88

Please sign in to comment.