From 8f811b725a279f7ed91b07591e7b816f190a0a26 Mon Sep 17 00:00:00 2001 From: scoiatael Date: Wed, 8 Mar 2017 14:03:34 +0100 Subject: [PATCH] Use concurrent writer pool --- actions/http_server.go | 52 ++++++++++++++++++++++++++++-------------- http/context.go | 6 ++++- http/fast_http.go | 6 +++-- http/iris.go | 6 +++-- 4 files changed, 48 insertions(+), 22 deletions(-) diff --git a/actions/http_server.go b/actions/http_server.go index 17246d2..ea869eb 100644 --- a/actions/http_server.go +++ b/actions/http_server.go @@ -14,8 +14,40 @@ type HttpServer struct { Port int } +type WriteJob struct { + payload simplejson.Object + stream string +} + +func writer(jobs <-chan WriteJob, c Context) { + for j := range jobs { + j.Run(c) + } +} + +func (wj *WriteJob) Run(c Context) { + payload, err := json.Marshal(wj.payload) + if err != nil { + c.HandleErr(err) + return + } + action := WriteEvent{Stream: wj.stream, Payload: payload, Meta: make(map[string]string)} + action.Meta["origin"] = "http" + action.Meta["compressed"] = "false" + err = action.Run(c) + if err != nil { + c.HandleErr(err) + } else { + c.Telemetry().Incr("write", []string{"stream:" + wj.stream}) + } +} + func (hs HttpServer) Run(c Context) error { handler := c.HttpHandler() + jobs := make(chan WriteJob, 50) + for w := 0; w < 3; w++ { + go writer(jobs, c) + } handler.Get("/stream/:id", func(ctx http.GetContext) { stream := ctx.GetSegment("id") action := ReadEvents{Stream: stream} @@ -51,23 +83,9 @@ func (hs HttpServer) Run(c Context) error { // Error was already sent return } - payload, err := json.Marshal(body) - if err != nil { - c.HandleErr(err) - ctx.ServerErr(err) - return - } - action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)} - action.Meta["origin"] = "http" - action.Meta["compressed"] = "false" - err = action.Run(c) - if err != nil { - c.HandleErr(err) - ctx.ServerErr(err) - } else { - c.Telemetry().Incr("write", []string{"stream:" + stream}) - ctx.SendJson("OK") - } + + jobs <- WriteJob{stream: stream, payload: body} + ctx.SendJson("OK") }) connString := fmt.Sprintf("%s:%d", hs.Addr, hs.Port) diff --git a/http/context.go b/http/context.go index 4d0e525..347802c 100644 --- a/http/context.go +++ b/http/context.go @@ -1,5 +1,9 @@ package http +import ( + "github.com/scoiatael/archai/simplejson" +) + type Context interface { HandleErr(error) } @@ -18,5 +22,5 @@ type GetContext interface { type PostContext interface { HttpContext - JsonBodyParams() (map[string]interface{}, error) + JsonBodyParams() (simplejson.Object, error) } diff --git a/http/fast_http.go b/http/fast_http.go index d31ecbd..42214fb 100644 --- a/http/fast_http.go +++ b/http/fast_http.go @@ -8,6 +8,8 @@ import ( "github.com/pkg/errors" "github.com/valyala/fasthttp" + + "github.com/scoiatael/archai/simplejson" ) type FastHttpContext struct { @@ -63,9 +65,9 @@ type FastHttpPostContext struct { const expectedJSON = `{ "error": "expected JSON body" }` -func (pc FastHttpPostContext) JsonBodyParams() (map[string]interface{}, error) { +func (pc FastHttpPostContext) JsonBodyParams() (simplejson.Object, error) { body := pc.PostBody() - read := make(map[string]interface{}) + read := make(simplejson.Object) err := json.Unmarshal(body, &read) if err != nil { pc.Error(expectedJSON, fasthttp.StatusBadRequest) diff --git a/http/iris.go b/http/iris.go index 9ae44a3..e541cab 100644 --- a/http/iris.go +++ b/http/iris.go @@ -2,6 +2,8 @@ package http import ( "gopkg.in/kataras/iris.v6" + + "github.com/scoiatael/archai/simplejson" ) type IrisHttpContext struct { @@ -43,8 +45,8 @@ type IrisPostContext struct { IrisHttpContext } -func (hc IrisPostContext) JsonBodyParams() (map[string]interface{}, error) { - sess := iris.Map{} +func (hc IrisPostContext) JsonBodyParams() (simplejson.Object, error) { + sess := make(simplejson.Object) err := hc.ReadJSON(&sess) if err != nil {