Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Simplify HttpServer logic #8

Merged
merged 1 commit into from
Mar 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions actions/bulk_write_job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package actions

import (
"encoding/json"
"fmt"

"github.com/pkg/errors"
"github.com/scoiatael/archai/simplejson"
)

type BulkWriteJob struct {
Schema []interface{} `json:"schema"`
Objects []interface{} `json:"data"`
Stream string `json:"stream"`
}

func (wj BulkWriteJob) MarshalJSON() ([]byte, error) {
return json.Marshal(map[string]interface{}{"schema": wj.Schema,
"data": wj.Objects,
"stream": wj.Stream,
})
}

func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) {
object_with_schema := make(simplejson.Object)
object, conv := obj.([]interface{})
if !conv {
return object_with_schema, fmt.Errorf("Failed to convert obj to array")
}
for j, name := range schema {
name, conv := name.(string)
if !conv {
return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j)
}
if len(object) <= j {
return object_with_schema, fmt.Errorf("%d: Not enough values", j)
}
object_with_schema[name] = object[j]
}
return object_with_schema, nil
}

func (wj BulkWriteJob) Run(c Context) error {
c.Telemetry().Incr("bulk_write.aggregate.attempt", []string{"stream:" + wj.Stream})
for i, obj := range wj.Objects {
object, err := makeObjectWithSchema(obj, wj.Schema)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i))
}
payload, err := json.Marshal(object)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i))
}
if err := persistEvent(wj.Stream, payload, "http; bulk_write_job", c); err != nil {
return errors.Wrap(err, "HTTP server bulk_writing events")
}
c.Telemetry().Incr("write", []string{"stream:" + wj.Stream})
}
c.Telemetry().Incr("bulk_write.aggregate.write", []string{"stream:" + wj.Stream})
return nil
}
1 change: 1 addition & 0 deletions actions/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type HttpHandler interface {
}

type Context interface {
BackgroundJobs() chan Action
Persistence() persistence.Provider
Migrations() map[string]persistence.Migration
Version() string
Expand Down
16 changes: 16 additions & 0 deletions actions/handlers/context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package handlers

import (
"github.com/scoiatael/archai/telemetry"
"github.com/scoiatael/archai/types"
)

type Context interface {
ReadEvents(string, string, int) ([]types.Event, error)
ListStreams() ([]string, error)
Telemetry() telemetry.Datadog
}

type Handler struct {
Context
}
44 changes: 44 additions & 0 deletions actions/handlers/get_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package handlers

import (
"github.com/pkg/errors"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
"github.com/scoiatael/archai/types"
)

func serializeEvents(events []types.Event) (simplejson.Object, error) {
root := make(simplejson.Object)
results := make([]simplejson.Object, len(events))
cursor := make(simplejson.Object)
for i, ev := range events {
payload, err := simplejson.Read(ev.Blob)
if err != nil {
return root, errors.Wrap(err, "HTTP server marshalling response with read events")
}
results[i] = payload
cursor["next"] = ev.ID
}
root["results"] = results
root["cursor"] = cursor
return root, nil
}

func (gs Handler) GetStream(ctx http.GetContext) {
stream := ctx.GetSegment("id")
events, err := gs.Context.ReadEvents(
stream,
ctx.StringParam("cursor"),
ctx.IntParam("amount", 10),
)
if err != nil {
ctx.ServerErr(errors.Wrap(err, "GetStream Handle ReadEvents"))
return
}
json, err := serializeEvents(events)
if err != nil {
ctx.ServerErr(err)
}
gs.Context.Telemetry().Incr("read", []string{"stream:" + stream})
ctx.SendJson(json)
}
18 changes: 18 additions & 0 deletions actions/handlers/get_streams.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package handlers

import (
"github.com/pkg/errors"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
)

func (gs Handler) GetStreams(ctx http.GetContext) {
streams, err := gs.Context.ListStreams()
if err != nil {
ctx.ServerErr(errors.Wrap(err, "GetStreams Handle .ListStreams"))
return
}
view := make(simplejson.Object)
view["streams"] = streams
ctx.SendJson(view)
}
183 changes: 29 additions & 154 deletions actions/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package actions
import (
"encoding/json"
"fmt"
"log"
"reflect"
"time"

"github.com/pkg/errors"
"github.com/scoiatael/archai/actions/handlers"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
"github.com/scoiatael/archai/types"
)

// TODO: This should not be an action. Maybe introduce Job type?
Expand All @@ -18,183 +16,60 @@ type HttpServer struct {
Port int
}

type BackgroundJob interface {
Run(c Context)
type HandlerContext struct {
Context
}

func persistEvent(stream string, payload []byte, origin string, c Context) error {
var err error
for i := 0; i < c.Retries(); i += 1 {
action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = origin
action.Meta["compressed"] = "false"
action.Meta["time"] = string(time.Now().Unix())
err = action.Run(c)
if err == nil {
break
}
time.Sleep(c.Backoff(i))
c.Telemetry().Incr("persist.retries", []string{"stream" + stream})
log.Println("Retrying persistEvent, because of", err)
}
return err
}

type WriteJob struct {
payload simplejson.Object
stream string
func (c HandlerContext) ReadEvents(stream string, cursor string, amount int) ([]types.Event, error) {
re := ReadEvents{Stream: stream, Cursor: cursor, Amount: amount}
err := re.Run(c)
return re.Events, errors.Wrap(err, "HandlerContext ReadEvents .Run")
}

func (wj WriteJob) Run(c Context) {
payload, err := json.Marshal(wj.payload)
err = errors.Wrap(err, "HTTP server marshalling payload to write event")
func (c HandlerContext) ListStreams() ([]string, error) {
session, err := c.Persistence().Session()
if err != nil {
c.HandleErr(err)
return
}
err = persistEvent(wj.stream, payload, "http; write_job", c)
if err != nil {
c.HandleErr(errors.Wrap(err, "HTTP server writing event"))
} else {
c.Telemetry().Incr("write", []string{"stream:" + wj.stream})
}
}

type BulkWriteJob struct {
schema []interface{}
objects []interface{}
stream string
}

func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) {
object_with_schema := make(simplejson.Object)
object, conv := obj.([]interface{})
if !conv {
return object_with_schema, fmt.Errorf("Failed to convert obj to array")
}
for j, name := range schema {
name, conv := name.(string)
if !conv {
return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j)
}
if len(object) <= j {
return object_with_schema, fmt.Errorf("%d: Not enough values", j)
}
object_with_schema[name] = object[j]
}
return object_with_schema, nil
}

func (wj BulkWriteJob) Run(c Context) {
c.Telemetry().Incr("bulk_write.aggregate", []string{"stream:" + wj.stream})
for i, obj := range wj.objects {
object, err := makeObjectWithSchema(obj, wj.schema)
err = errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i))
if err != nil {
c.HandleErr(err)
return
}
payload, err := json.Marshal(object)
err = errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i))
if err != nil {
c.HandleErr(err)
return
}
err = persistEvent(wj.stream, payload, "http; bulk_write_job", c)
if err != nil {
c.HandleErr(errors.Wrap(err, "HTTP server bulk_writing events"))
} else {
c.Telemetry().Incr("write", []string{"stream:" + wj.stream})
}
}
}

func writer(jobs <-chan BackgroundJob, c Context) {
for j := range jobs {
j.Run(c)
return []string{}, errors.Wrap(err, "HandlerContext ListStreams .Persistence.Session")
}
return session.ListStreams()
}

func (hs HttpServer) Run(c Context) error {
handler := c.HttpHandler()
jobs := make(chan BackgroundJob, 50)
for w := 0; w < c.Concurrency(); w++ {
go writer(jobs, c)
}
handler.Get("/_check", func(ctx http.GetContext) {
ctx.SendJson("OK")
})
handler.Get("/streams", func(ctx http.GetContext) {
session, err := c.Persistence().Session()
err = errors.Wrap(err, "Obtaining session failed")
if err != nil {
ctx.ServerErr(err)
return
}
streams, err := session.ListStreams()
view := make(simplejson.Object)
view["streams"] = streams
ctx.SendJson(view)
})
handler.Get("/stream/:id", func(ctx http.GetContext) {
stream := ctx.GetSegment("id")
action := ReadEvents{Stream: stream}
action.Cursor = ctx.StringParam("cursor")
action.Amount = ctx.IntParam("amount", 10)
err := errors.Wrap(action.Run(c), "HTTP server reading events")
if err != nil {
ctx.ServerErr(err)
return
}
root := make(simplejson.Object)
events := make(simplejson.ObjectArray, len(action.Events))
for i, ev := range action.Events {
events[i] = make(simplejson.Object)
events[i]["ID"] = ev.ID
payload, err := simplejson.Read(ev.Blob)
err = errors.Wrap(err, "HTTP server marshalling response with read events")
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
}
events[i]["blob"] = payload
}
root["results"] = events
c.Telemetry().Incr("read", []string{"stream:" + stream})
ctx.SendJson(root)
})
jobs := c.BackgroundJobs()
handler_context := handlers.Handler{HandlerContext{c}}
handler.Get("/_check", func(ctx http.GetContext) { ctx.SendJson("OK") })
handler.Get("/streams", handler_context.GetStreams)
handler.Get("/stream/:id", handler_context.GetStream)
handler.Post("/bulk/stream/:id", func(ctx http.PostContext) {
var err error
stream := ctx.GetSegment("id")
body, err := ctx.JsonBodyParams()

job := BulkWriteJob{}
err = ctx.ReadJSON(&job)

if err != nil {
// Error was already sent
return
}
objects, conv := body["data"].([]interface{})
if !conv {
ctx.ServerErr(fmt.Errorf("'data' field is not an Array (is %v)", reflect.TypeOf(body["data"])))
return
}
schema, conv := body["schema"].([]interface{})
if !conv {
ctx.ServerErr(fmt.Errorf("'schema' field is not an Array (is %v)", reflect.TypeOf(body["schema"])))
ctx.ServerErr(fmt.Errorf("Expected body, encountered: %v", err))
return
}

jobs <- BulkWriteJob{stream: stream, objects: objects, schema: schema}
job.Stream = stream

jobs <- job
c.Telemetry().Gauge("jobs.len", []string{}, len(jobs))
ctx.SendJson("OK")
})
handler.Post("/stream/:id", func(ctx http.PostContext) {
var err error
stream := ctx.GetSegment("id")
body, err := ctx.JsonBodyParams()
if err != nil {
// Error was already sent
ctx.ServerErr(fmt.Errorf("Expected body, encountered: %v", err))
return
}

jobs <- WriteJob{stream: stream, payload: body}
jobs <- WriteJob{Stream: stream, Payload: body}
c.Telemetry().Gauge("jobs.len", []string{}, len(jobs))
ctx.SendJson("OK")
})

Expand Down
Loading