Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Endpoint for bulk writes
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Mar 9, 2017
1 parent 375e087 commit ecec5b8
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 12 deletions.
107 changes: 95 additions & 12 deletions actions/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,110 @@ package actions
import (
"encoding/json"
"fmt"
"reflect"
"time"

"github.com/pkg/errors"
"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/simplejson"
)

// TODO: This should not be an action. Maybe introduce Job type?
type HttpServer struct {
Addr string
Port int
}

type BackgroundJob interface {
Run(c Context)
}

func persistEvent(stream string, payload []byte, origin string, c Context) error {
action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = origin
action.Meta["compressed"] = "false"
action.Meta["time"] = string(time.Now().Unix())
return action.Run(c)
}

type WriteJob struct {
payload simplejson.Object
stream string
}

func writer(jobs <-chan WriteJob, c Context) {
for j := range jobs {
j.Run(c)
}
}

func (wj *WriteJob) Run(c Context) {
func (wj WriteJob) Run(c Context) {
payload, err := json.Marshal(wj.payload)
err = errors.Wrap(err, "HTTP server marshalling payload to write event")
if err != nil {
c.HandleErr(err)
return
}
action := WriteEvent{Stream: wj.stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = "http"
action.Meta["compressed"] = "false"
err = action.Run(c)
err = persistEvent(wj.stream, payload, "http; write_job", c)
if err != nil {
c.HandleErr(errors.Wrap(err, "HTTP server writing event"))
} else {
c.Telemetry().Incr("write", []string{"stream:" + wj.stream})
}
}

type BulkWriteJob struct {
schema []interface{}
objects []interface{}
stream string
}

func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) {
object_with_schema := make(simplejson.Object)
object, conv := obj.([]interface{})
if !conv {
return object_with_schema, fmt.Errorf("Failed to convert obj to array")
}
for j, name := range schema {
name, conv := name.(string)
if !conv {
return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j)
}
if len(object) <= j {
return object_with_schema, fmt.Errorf("%d: Not enough values", j)
}
object_with_schema[name] = object[j]
}
return object_with_schema, nil
}

func (wj BulkWriteJob) Run(c Context) {
c.Telemetry().Incr("bulk_write.aggregate", []string{"stream:" + wj.stream})
for i, obj := range wj.objects {
object, err := makeObjectWithSchema(obj, wj.schema)
err = errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i))
if err != nil {
c.HandleErr(err)
return
}
payload, err := json.Marshal(object)
err = errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i))
if err != nil {
c.HandleErr(err)
return
}
err = persistEvent(wj.stream, payload, "http; bulk_write_job", c)
if err != nil {
c.HandleErr(errors.Wrap(err, "HTTP server bulk_writing events"))
} else {
c.Telemetry().Incr("bulk_write.singular", []string{"stream:" + wj.stream})
}
}
}

func writer(jobs <-chan BackgroundJob, c Context) {
for j := range jobs {
j.Run(c)
}
}

func (hs HttpServer) Run(c Context) error {
handler := c.HttpHandler()
jobs := make(chan WriteJob, 50)
jobs := make(chan BackgroundJob, 50)
for w := 0; w < c.Concurrency(); w++ {
go writer(jobs, c)
}
Expand Down Expand Up @@ -80,6 +141,28 @@ func (hs HttpServer) Run(c Context) error {
ctx.SendJson(root)
}
})
handler.Post("/bulk/stream/:id", func(ctx http.PostContext) {
var err error
stream := ctx.GetSegment("id")
body, err := ctx.JsonBodyParams()
if err != nil {
// Error was already sent
return
}
objects, conv := body["data"].([]interface{})
if !conv {
ctx.ServerErr(fmt.Errorf("'data' field is not an Array (is %v)", reflect.TypeOf(body["data"])))
return
}
schema, conv := body["schema"].([]interface{})
if !conv {
ctx.ServerErr(fmt.Errorf("'schema' field is not an Array (is %v)", reflect.TypeOf(body["schema"])))
return
}

jobs <- BulkWriteJob{stream: stream, objects: objects, schema: schema}
ctx.SendJson("OK")
})
handler.Post("/stream/:id", func(ctx http.PostContext) {
var err error
stream := ctx.GetSegment("id")
Expand Down
35 changes: 35 additions & 0 deletions actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

. "github.com/scoiatael/archai"
"github.com/scoiatael/archai/actions"
"github.com/scoiatael/archai/simplejson"
"github.com/scoiatael/archai/util"

. "github.com/onsi/ginkgo"
Expand Down Expand Up @@ -63,6 +64,40 @@ var _ = Describe("Actions", func() {
action.Stop()
})

Describe("/bulk/stream/:id", func() {
JustBeforeEach(func() {
stream = util.RandomString(10)
address = fmt.Sprintf("%s/bulk/stream/%s", address, stream)
buf = bytes.NewBufferString(`{ "data": [["foo",1,2,3], ["bar",4,5,6]], "schema": ["name", "likes", "shares", "comments"] }`)
})

It("allows writing events", func() {
resp, err := http.Post(address, "application/json", buf)

Expect(err).NotTo(HaveOccurred())
body, err := ioutil.ReadAll(resp.Body)
Expect(err).NotTo(HaveOccurred())
Expect(string(body)).To(Equal(`"OK"`))

time.Sleep(10 * time.Millisecond)

action := actions.ReadEvents{}
action.Amount = 5
action.Stream = stream
err = action.Run(config)
Expect(err).NotTo(HaveOccurred())
Expect(action.Events).NotTo(BeEmpty())
Expect(action.Events).To(HaveLen(2))

js, err := simplejson.Read(action.Events[0].Blob)
Expect(err).NotTo(HaveOccurred())
Expect(js["name"]).To(Equal("foo"))
Expect(js["likes"]).To(Equal(1.0))
Expect(js["shares"]).To(Equal(2.0))
Expect(js["comments"]).To(Equal(3.0))
})
})

Describe("/stream/:id", func() {
JustBeforeEach(func() {
stream = util.RandomString(10)
Expand Down

0 comments on commit ecec5b8

Please sign in to comment.