diff --git a/actions/http_server.go b/actions/http_server.go index b3408a1..b24a0f6 100644 --- a/actions/http_server.go +++ b/actions/http_server.go @@ -3,39 +3,45 @@ package actions import ( "encoding/json" "fmt" + "reflect" + "time" "github.com/pkg/errors" "github.com/scoiatael/archai/http" "github.com/scoiatael/archai/simplejson" ) +// TODO: This should not be an action. Maybe introduce Job type? type HttpServer struct { Addr string Port int } +type BackgroundJob interface { + Run(c Context) +} + +func persistEvent(stream string, payload []byte, origin string, c Context) error { + action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)} + action.Meta["origin"] = origin + action.Meta["compressed"] = "false" + action.Meta["time"] = string(time.Now().Unix()) + return action.Run(c) +} + type WriteJob struct { payload simplejson.Object stream string } -func writer(jobs <-chan WriteJob, c Context) { - for j := range jobs { - j.Run(c) - } -} - -func (wj *WriteJob) Run(c Context) { +func (wj WriteJob) Run(c Context) { payload, err := json.Marshal(wj.payload) err = errors.Wrap(err, "HTTP server marshalling payload to write event") if err != nil { c.HandleErr(err) return } - action := WriteEvent{Stream: wj.stream, Payload: payload, Meta: make(map[string]string)} - action.Meta["origin"] = "http" - action.Meta["compressed"] = "false" - err = action.Run(c) + err = persistEvent(wj.stream, payload, "http; write_job", c) if err != nil { c.HandleErr(errors.Wrap(err, "HTTP server writing event")) } else { @@ -43,9 +49,64 @@ func (wj *WriteJob) Run(c Context) { } } +type BulkWriteJob struct { + schema []interface{} + objects []interface{} + stream string +} + +func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) { + object_with_schema := make(simplejson.Object) + object, conv := obj.([]interface{}) + if !conv { + return object_with_schema, fmt.Errorf("Failed to convert obj to array") + } + for j, name := range schema { + name, conv := name.(string) + if !conv { + return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j) + } + if len(object) <= j { + return object_with_schema, fmt.Errorf("%d: Not enough values", j) + } + object_with_schema[name] = object[j] + } + return object_with_schema, nil +} + +func (wj BulkWriteJob) Run(c Context) { + c.Telemetry().Incr("bulk_write.aggregate", []string{"stream:" + wj.stream}) + for i, obj := range wj.objects { + object, err := makeObjectWithSchema(obj, wj.schema) + err = errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i)) + if err != nil { + c.HandleErr(err) + return + } + payload, err := json.Marshal(object) + err = errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i)) + if err != nil { + c.HandleErr(err) + return + } + err = persistEvent(wj.stream, payload, "http; bulk_write_job", c) + if err != nil { + c.HandleErr(errors.Wrap(err, "HTTP server bulk_writing events")) + } else { + c.Telemetry().Incr("bulk_write.singular", []string{"stream:" + wj.stream}) + } + } +} + +func writer(jobs <-chan BackgroundJob, c Context) { + for j := range jobs { + j.Run(c) + } +} + func (hs HttpServer) Run(c Context) error { handler := c.HttpHandler() - jobs := make(chan WriteJob, 50) + jobs := make(chan BackgroundJob, 50) for w := 0; w < c.Concurrency(); w++ { go writer(jobs, c) } @@ -80,6 +141,28 @@ func (hs HttpServer) Run(c Context) error { ctx.SendJson(root) } }) + handler.Post("/bulk/stream/:id", func(ctx http.PostContext) { + var err error + stream := ctx.GetSegment("id") + body, err := ctx.JsonBodyParams() + if err != nil { + // Error was already sent + return + } + objects, conv := body["data"].([]interface{}) + if !conv { + ctx.ServerErr(fmt.Errorf("'data' field is not an Array (is %v)", reflect.TypeOf(body["data"]))) + return + } + schema, conv := body["schema"].([]interface{}) + if !conv { + ctx.ServerErr(fmt.Errorf("'schema' field is not an Array (is %v)", reflect.TypeOf(body["schema"]))) + return + } + + jobs <- BulkWriteJob{stream: stream, objects: objects, schema: schema} + ctx.SendJson("OK") + }) handler.Post("/stream/:id", func(ctx http.PostContext) { var err error stream := ctx.GetSegment("id") diff --git a/actions_test.go b/actions_test.go index a13b394..a8d9048 100644 --- a/actions_test.go +++ b/actions_test.go @@ -11,6 +11,7 @@ import ( . "github.com/scoiatael/archai" "github.com/scoiatael/archai/actions" + "github.com/scoiatael/archai/simplejson" "github.com/scoiatael/archai/util" . "github.com/onsi/ginkgo" @@ -63,6 +64,40 @@ var _ = Describe("Actions", func() { action.Stop() }) + Describe("/bulk/stream/:id", func() { + JustBeforeEach(func() { + stream = util.RandomString(10) + address = fmt.Sprintf("%s/bulk/stream/%s", address, stream) + buf = bytes.NewBufferString(`{ "data": [["foo",1,2,3], ["bar",4,5,6]], "schema": ["name", "likes", "shares", "comments"] }`) + }) + + It("allows writing events", func() { + resp, err := http.Post(address, "application/json", buf) + + Expect(err).NotTo(HaveOccurred()) + body, err := ioutil.ReadAll(resp.Body) + Expect(err).NotTo(HaveOccurred()) + Expect(string(body)).To(Equal(`"OK"`)) + + time.Sleep(10 * time.Millisecond) + + action := actions.ReadEvents{} + action.Amount = 5 + action.Stream = stream + err = action.Run(config) + Expect(err).NotTo(HaveOccurred()) + Expect(action.Events).NotTo(BeEmpty()) + Expect(action.Events).To(HaveLen(2)) + + js, err := simplejson.Read(action.Events[0].Blob) + Expect(err).NotTo(HaveOccurred()) + Expect(js["name"]).To(Equal("foo")) + Expect(js["likes"]).To(Equal(1.0)) + Expect(js["shares"]).To(Equal(2.0)) + Expect(js["comments"]).To(Equal(3.0)) + }) + }) + Describe("/stream/:id", func() { JustBeforeEach(func() { stream = util.RandomString(10)