Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Merge pull request #8 from nowthisnews/lczaplinski/refactor
Browse files Browse the repository at this point in the history
Simplify HttpServer logic
  • Loading branch information
scoiatael authored Mar 13, 2017
2 parents 96398fd + e0e30b0 commit bf4b12f
Show file tree
Hide file tree
Showing 18 changed files with 317 additions and 307 deletions.
61 changes: 61 additions & 0 deletions actions/bulk_write_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package actions

import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
"github.com/scoiatael/archai/simplejson"
)

type BulkWriteJob struct {
Schema []interface{} `json:"schema"`
Objects []interface{} `json:"data"`
Stream string `json:"stream"`
}

func (wj BulkWriteJob) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{"schema": wj.Schema,
"data": wj.Objects,
"stream": wj.Stream,
})
}

func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) {
object_with_schema := make(simplejson.Object)
object, conv := obj.([]interface{})
if !conv {
return object_with_schema, fmt.Errorf("Failed to convert obj to array")
}
for j, name := range schema {
name, conv := name.(string)
if !conv {
return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j)
}
if len(object) <= j {
return object_with_schema, fmt.Errorf("%d: Not enough values", j)
}
object_with_schema[name] = object[j]
}
return object_with_schema, nil
}

func (wj BulkWriteJob) Run(c Context) error {
c.Telemetry().Incr("bulk_write.aggregate.attempt", []string{"stream:" + wj.Stream})
for i, obj := range wj.Objects {
object, err := makeObjectWithSchema(obj, wj.Schema)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i))
}
payload, err := json.Marshal(object)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i))
}
if err := persistEvent(wj.Stream, payload, "http; bulk_write_job", c); err != nil {
return errors.Wrap(err, "HTTP server bulk_writing events")
}
c.Telemetry().Incr("write", []string{"stream:" + wj.Stream})
}
c.Telemetry().Incr("bulk_write.aggregate.write", []string{"stream:" + wj.Stream})
return nil
}
1 change: 1 addition & 0 deletions actions/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type HttpHandler interface {
}

type Context interface {
BackgroundJobs() chan Action
Persistence() persistence.Provider
Migrations() map[string]persistence.Migration
Version() string
Expand Down
16 changes: 16 additions & 0 deletions actions/handlers/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package handlers

import (
"github.com/scoiatael/archai/telemetry"
"github.com/scoiatael/archai/types"
)

type Context interface {
ReadEvents(string, string, int) ([]types.Event, error)
ListStreams() ([]string, error)
Telemetry() telemetry.Datadog
}

type Handler struct {
Context
}
44 changes: 44 additions & 0 deletions actions/handlers/get_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package handlers

import (
"github.com/pkg/errors"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
"github.com/scoiatael/archai/types"
)

func serializeEvents(events []types.Event) (simplejson.Object, error) {
root := make(simplejson.Object)
results := make([]simplejson.Object, len(events))
cursor := make(simplejson.Object)
for i, ev := range events {
payload, err := simplejson.Read(ev.Blob)
if err != nil {
return root, errors.Wrap(err, "HTTP server marshalling response with read events")
}
results[i] = payload
cursor["next"] = ev.ID
}
root["results"] = results
root["cursor"] = cursor
return root, nil
}

func (gs Handler) GetStream(ctx http.GetContext) {
stream := ctx.GetSegment("id")
events, err := gs.Context.ReadEvents(
stream,
ctx.StringParam("cursor"),
ctx.IntParam("amount", 10),
)
if err != nil {
ctx.ServerErr(errors.Wrap(err, "GetStream Handle ReadEvents"))
return
}
json, err := serializeEvents(events)
if err != nil {
ctx.ServerErr(err)
}
gs.Context.Telemetry().Incr("read", []string{"stream:" + stream})
ctx.SendJson(json)
}
18 changes: 18 additions & 0 deletions actions/handlers/get_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package handlers

import (
"github.com/pkg/errors"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
)

func (gs Handler) GetStreams(ctx http.GetContext) {
streams, err := gs.Context.ListStreams()
if err != nil {
ctx.ServerErr(errors.Wrap(err, "GetStreams Handle .ListStreams"))
return
}
view := make(simplejson.Object)
view["streams"] = streams
ctx.SendJson(view)
}
183 changes: 29 additions & 154 deletions actions/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package actions
import (
"encoding/json"
"fmt"
"log"
"reflect"
"time"

"github.com/pkg/errors"
"github.com/scoiatael/archai/actions/handlers"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
"github.com/scoiatael/archai/types"
)

// TODO: This should not be an action. Maybe introduce Job type?
Expand All @@ -18,183 +16,60 @@ type HttpServer struct {
Port int
}

type BackgroundJob interface {
Run(c Context)
type HandlerContext struct {
Context
}

func persistEvent(stream string, payload []byte, origin string, c Context) error {
var err error
for i := 0; i < c.Retries(); i += 1 {
action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = origin
action.Meta["compressed"] = "false"
action.Meta["time"] = string(time.Now().Unix())
err = action.Run(c)
if err == nil {
break
}
time.Sleep(c.Backoff(i))
c.Telemetry().Incr("persist.retries", []string{"stream" + stream})
log.Println("Retrying persistEvent, because of", err)
}
return err
}

type WriteJob struct {
payload simplejson.Object
stream string
func (c HandlerContext) ReadEvents(stream string, cursor string, amount int) ([]types.Event, error) {
re := ReadEvents{Stream: stream, Cursor: cursor, Amount: amount}
err := re.Run(c)
return re.Events, errors.Wrap(err, "HandlerContext ReadEvents .Run")
}

func (wj WriteJob) Run(c Context) {
payload, err := json.Marshal(wj.payload)
err = errors.Wrap(err, "HTTP server marshalling payload to write event")
func (c HandlerContext) ListStreams() ([]string, error) {
session, err := c.Persistence().Session()
if err != nil {
c.HandleErr(err)
return
}
err = persistEvent(wj.stream, payload, "http; write_job", c)
if err != nil {
c.HandleErr(errors.Wrap(err, "HTTP server writing event"))
} else {
c.Telemetry().Incr("write", []string{"stream:" + wj.stream})
}
}

type BulkWriteJob struct {
schema []interface{}
objects []interface{}
stream string
}

func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) {
object_with_schema := make(simplejson.Object)
object, conv := obj.([]interface{})
if !conv {
return object_with_schema, fmt.Errorf("Failed to convert obj to array")
}
for j, name := range schema {
name, conv := name.(string)
if !conv {
return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j)
}
if len(object) <= j {
return object_with_schema, fmt.Errorf("%d: Not enough values", j)
}
object_with_schema[name] = object[j]
}
return object_with_schema, nil
}

func (wj BulkWriteJob) Run(c Context) {
c.Telemetry().Incr("bulk_write.aggregate", []string{"stream:" + wj.stream})
for i, obj := range wj.objects {
object, err := makeObjectWithSchema(obj, wj.schema)
err = errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i))
if err != nil {
c.HandleErr(err)
return
}
payload, err := json.Marshal(object)
err = errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i))
if err != nil {
c.HandleErr(err)
return
}
err = persistEvent(wj.stream, payload, "http; bulk_write_job", c)
if err != nil {
c.HandleErr(errors.Wrap(err, "HTTP server bulk_writing events"))
} else {
c.Telemetry().Incr("write", []string{"stream:" + wj.stream})
}
}
}

func writer(jobs <-chan BackgroundJob, c Context) {
for j := range jobs {
j.Run(c)
return []string{}, errors.Wrap(err, "HandlerContext ListStreams .Persistence.Session")
}
return session.ListStreams()
}

func (hs HttpServer) Run(c Context) error {
handler := c.HttpHandler()
jobs := make(chan BackgroundJob, 50)
for w := 0; w < c.Concurrency(); w++ {
go writer(jobs, c)
}
handler.Get("/_check", func(ctx http.GetContext) {
ctx.SendJson("OK")
})
handler.Get("/streams", func(ctx http.GetContext) {
session, err := c.Persistence().Session()
err = errors.Wrap(err, "Obtaining session failed")
if err != nil {
ctx.ServerErr(err)
return
}
streams, err := session.ListStreams()
view := make(simplejson.Object)
view["streams"] = streams
ctx.SendJson(view)
})
handler.Get("/stream/:id", func(ctx http.GetContext) {
stream := ctx.GetSegment("id")
action := ReadEvents{Stream: stream}
action.Cursor = ctx.StringParam("cursor")
action.Amount = ctx.IntParam("amount", 10)
err := errors.Wrap(action.Run(c), "HTTP server reading events")
if err != nil {
ctx.ServerErr(err)
return
}
root := make(simplejson.Object)
events := make(simplejson.ObjectArray, len(action.Events))
for i, ev := range action.Events {
events[i] = make(simplejson.Object)
events[i]["ID"] = ev.ID
payload, err := simplejson.Read(ev.Blob)
err = errors.Wrap(err, "HTTP server marshalling response with read events")
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
}
events[i]["blob"] = payload
}
root["results"] = events
c.Telemetry().Incr("read", []string{"stream:" + stream})
ctx.SendJson(root)
})
jobs := c.BackgroundJobs()
handler_context := handlers.Handler{HandlerContext{c}}
handler.Get("/_check", func(ctx http.GetContext) { ctx.SendJson("OK") })
handler.Get("/streams", handler_context.GetStreams)
handler.Get("/stream/:id", handler_context.GetStream)
handler.Post("/bulk/stream/:id", func(ctx http.PostContext) {
var err error
stream := ctx.GetSegment("id")
body, err := ctx.JsonBodyParams()

job := BulkWriteJob{}
err = ctx.ReadJSON(&job)

if err != nil {
// Error was already sent
return
}
objects, conv := body["data"].([]interface{})
if !conv {
ctx.ServerErr(fmt.Errorf("'data' field is not an Array (is %v)", reflect.TypeOf(body["data"])))
return
}
schema, conv := body["schema"].([]interface{})
if !conv {
ctx.ServerErr(fmt.Errorf("'schema' field is not an Array (is %v)", reflect.TypeOf(body["schema"])))
ctx.ServerErr(fmt.Errorf("Expected body, encountered: %v", err))
return
}

jobs <- BulkWriteJob{stream: stream, objects: objects, schema: schema}
job.Stream = stream

jobs <- job
c.Telemetry().Gauge("jobs.len", []string{}, len(jobs))
ctx.SendJson("OK")
})
handler.Post("/stream/:id", func(ctx http.PostContext) {
var err error
stream := ctx.GetSegment("id")
body, err := ctx.JsonBodyParams()
if err != nil {
// Error was already sent
ctx.ServerErr(fmt.Errorf("Expected body, encountered: %v", err))
return
}

jobs <- WriteJob{stream: stream, payload: body}
jobs <- WriteJob{Stream: stream, Payload: body}
c.Telemetry().Gauge("jobs.len", []string{}, len(jobs))
ctx.SendJson("OK")
})

Expand Down
Loading

0 comments on commit bf4b12f

Please sign in to comment.