Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Use concurrent writer pool
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Mar 8, 2017
1 parent 5dee135 commit 8f811b7
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 22 deletions.
52 changes: 35 additions & 17 deletions actions/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,40 @@ type HttpServer struct {
Port int
}

type WriteJob struct {
payload simplejson.Object
stream string
}

func writer(jobs <-chan WriteJob, c Context) {
for j := range jobs {
j.Run(c)
}
}

func (wj *WriteJob) Run(c Context) {
payload, err := json.Marshal(wj.payload)
if err != nil {
c.HandleErr(err)
return
}
action := WriteEvent{Stream: wj.stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = "http"
action.Meta["compressed"] = "false"
err = action.Run(c)
if err != nil {
c.HandleErr(err)
} else {
c.Telemetry().Incr("write", []string{"stream:" + wj.stream})
}
}

func (hs HttpServer) Run(c Context) error {
handler := c.HttpHandler()
jobs := make(chan WriteJob, 50)
for w := 0; w < 3; w++ {
go writer(jobs, c)
}
handler.Get("/stream/:id", func(ctx http.GetContext) {
stream := ctx.GetSegment("id")
action := ReadEvents{Stream: stream}
Expand Down Expand Up @@ -51,23 +83,9 @@ func (hs HttpServer) Run(c Context) error {
// Error was already sent
return
}
payload, err := json.Marshal(body)
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
return
}
action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = "http"
action.Meta["compressed"] = "false"
err = action.Run(c)
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
} else {
c.Telemetry().Incr("write", []string{"stream:" + stream})
ctx.SendJson("OK")
}

jobs <- WriteJob{stream: stream, payload: body}
ctx.SendJson("OK")
})

connString := fmt.Sprintf("%s:%d", hs.Addr, hs.Port)
Expand Down
6 changes: 5 additions & 1 deletion http/context.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package http

import (
"github.com/scoiatael/archai/simplejson"
)

type Context interface {
HandleErr(error)
}
Expand All @@ -18,5 +22,5 @@ type GetContext interface {

type PostContext interface {
HttpContext
JsonBodyParams() (map[string]interface{}, error)
JsonBodyParams() (simplejson.Object, error)
}
6 changes: 4 additions & 2 deletions http/fast_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (

"github.com/pkg/errors"
"github.com/valyala/fasthttp"

"github.com/scoiatael/archai/simplejson"
)

type FastHttpContext struct {
Expand Down Expand Up @@ -63,9 +65,9 @@ type FastHttpPostContext struct {

const expectedJSON = `{ "error": "expected JSON body" }`

func (pc FastHttpPostContext) JsonBodyParams() (map[string]interface{}, error) {
func (pc FastHttpPostContext) JsonBodyParams() (simplejson.Object, error) {
body := pc.PostBody()
read := make(map[string]interface{})
read := make(simplejson.Object)
err := json.Unmarshal(body, &read)
if err != nil {
pc.Error(expectedJSON, fasthttp.StatusBadRequest)
Expand Down
6 changes: 4 additions & 2 deletions http/iris.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package http

import (
"gopkg.in/kataras/iris.v6"

"github.com/scoiatael/archai/simplejson"
)

type IrisHttpContext struct {
Expand Down Expand Up @@ -43,8 +45,8 @@ type IrisPostContext struct {
IrisHttpContext
}

func (hc IrisPostContext) JsonBodyParams() (map[string]interface{}, error) {
sess := iris.Map{}
func (hc IrisPostContext) JsonBodyParams() (simplejson.Object, error) {
sess := make(simplejson.Object)
err := hc.ReadJSON(&sess)

if err != nil {
Expand Down

0 comments on commit 8f811b7

Please sign in to comment.