From e0e30b0c8d1a645addbecd7517d87d3cb60628c6 Mon Sep 17 00:00:00 2001 From: scoiatael Date: Fri, 10 Mar 2017 18:00:45 +0100 Subject: [PATCH] Simplify HttpServer logic * extract background workers * extract handlers * avoid sleeping in tests * remove fast_http adapter * send len of jobs queue to DD --- actions/bulk_write_job.go | 61 +++++++++++ actions/context.go | 1 + actions/handlers/context.go | 16 +++ actions/handlers/get_stream.go | 44 ++++++++ actions/handlers/get_streams.go | 18 ++++ actions/http_server.go | 183 +++++--------------------------- actions/start_worker.go | 25 +++++ actions/write_event.go | 20 ++++ actions/write_job.go | 30 ++++++ actions_test.go | 53 +++++---- archai_suite_test.go | 26 +++++ config.go | 7 ++ http/context.go | 1 + http/fast_http.go | 118 -------------------- http/new.go | 4 - main.go | 1 + persistence/persistence_test.go | 6 +- telemetry/datadog.go | 10 ++ 18 files changed, 317 insertions(+), 307 deletions(-) create mode 100644 actions/bulk_write_job.go create mode 100644 actions/handlers/context.go create mode 100644 actions/handlers/get_stream.go create mode 100644 actions/handlers/get_streams.go create mode 100644 actions/start_worker.go create mode 100644 actions/write_job.go delete mode 100644 http/fast_http.go diff --git a/actions/bulk_write_job.go b/actions/bulk_write_job.go new file mode 100644 index 0000000..96cfae6 --- /dev/null +++ b/actions/bulk_write_job.go @@ -0,0 +1,61 @@ +package actions + +import ( + "encoding/json" + "fmt" + + "github.com/pkg/errors" + "github.com/scoiatael/archai/simplejson" +) + +type BulkWriteJob struct { + Schema []interface{} `json:"schema"` + Objects []interface{} `json:"data"` + Stream string `json:"stream"` +} + +func (wj BulkWriteJob) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]interface{}{"schema": wj.Schema, + "data": wj.Objects, + "stream": wj.Stream, + }) +} + +func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) { + object_with_schema := make(simplejson.Object) + object, conv := obj.([]interface{}) + if !conv { + return object_with_schema, fmt.Errorf("Failed to convert obj to array") + } + for j, name := range schema { + name, conv := name.(string) + if !conv { + return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j) + } + if len(object) <= j { + return object_with_schema, fmt.Errorf("%d: Not enough values", j) + } + object_with_schema[name] = object[j] + } + return object_with_schema, nil +} + +func (wj BulkWriteJob) Run(c Context) error { + c.Telemetry().Incr("bulk_write.aggregate.attempt", []string{"stream:" + wj.Stream}) + for i, obj := range wj.Objects { + object, err := makeObjectWithSchema(obj, wj.Schema) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i)) + } + payload, err := json.Marshal(object) + if err != nil { + return errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i)) + } + if err := persistEvent(wj.Stream, payload, "http; bulk_write_job", c); err != nil { + return errors.Wrap(err, "HTTP server bulk_writing events") + } + c.Telemetry().Incr("write", []string{"stream:" + wj.Stream}) + } + c.Telemetry().Incr("bulk_write.aggregate.write", []string{"stream:" + wj.Stream}) + return nil +} diff --git a/actions/context.go b/actions/context.go index 1e85fd7..0b3b26b 100644 --- a/actions/context.go +++ b/actions/context.go @@ -18,6 +18,7 @@ type HttpHandler interface { } type Context interface { + BackgroundJobs() chan Action Persistence() persistence.Provider Migrations() map[string]persistence.Migration Version() string diff --git a/actions/handlers/context.go b/actions/handlers/context.go new file mode 100644 index 0000000..883bfb0 --- /dev/null +++ b/actions/handlers/context.go @@ -0,0 +1,16 @@ +package handlers + +import ( + "github.com/scoiatael/archai/telemetry" + "github.com/scoiatael/archai/types" +) + +type Context interface { + ReadEvents(string, string, int) ([]types.Event, error) + ListStreams() ([]string, error) + Telemetry() telemetry.Datadog +} + +type Handler struct { + Context +} diff --git a/actions/handlers/get_stream.go b/actions/handlers/get_stream.go new file mode 100644 index 0000000..8865306 --- /dev/null +++ b/actions/handlers/get_stream.go @@ -0,0 +1,44 @@ +package handlers + +import ( + "github.com/pkg/errors" + "github.com/scoiatael/archai/http" + "github.com/scoiatael/archai/simplejson" + "github.com/scoiatael/archai/types" +) + +func serializeEvents(events []types.Event) (simplejson.Object, error) { + root := make(simplejson.Object) + results := make([]simplejson.Object, len(events)) + cursor := make(simplejson.Object) + for i, ev := range events { + payload, err := simplejson.Read(ev.Blob) + if err != nil { + return root, errors.Wrap(err, "HTTP server marshalling response with read events") + } + results[i] = payload + cursor["next"] = ev.ID + } + root["results"] = results + root["cursor"] = cursor + return root, nil +} + +func (gs Handler) GetStream(ctx http.GetContext) { + stream := ctx.GetSegment("id") + events, err := gs.Context.ReadEvents( + stream, + ctx.StringParam("cursor"), + ctx.IntParam("amount", 10), + ) + if err != nil { + ctx.ServerErr(errors.Wrap(err, "GetStream Handle ReadEvents")) + return + } + json, err := serializeEvents(events) + if err != nil { + ctx.ServerErr(err) + } + gs.Context.Telemetry().Incr("read", []string{"stream:" + stream}) + ctx.SendJson(json) +} diff --git a/actions/handlers/get_streams.go b/actions/handlers/get_streams.go new file mode 100644 index 0000000..171c37a --- /dev/null +++ b/actions/handlers/get_streams.go @@ -0,0 +1,18 @@ +package handlers + +import ( + "github.com/pkg/errors" + "github.com/scoiatael/archai/http" + "github.com/scoiatael/archai/simplejson" +) + +func (gs Handler) GetStreams(ctx http.GetContext) { + streams, err := gs.Context.ListStreams() + if err != nil { + ctx.ServerErr(errors.Wrap(err, "GetStreams Handle .ListStreams")) + return + } + view := make(simplejson.Object) + view["streams"] = streams + ctx.SendJson(view) +} diff --git a/actions/http_server.go b/actions/http_server.go index 5249b40..1f86269 100644 --- a/actions/http_server.go +++ b/actions/http_server.go @@ -3,13 +3,11 @@ package actions import ( "encoding/json" "fmt" - "log" - "reflect" - "time" "github.com/pkg/errors" + "github.com/scoiatael/archai/actions/handlers" "github.com/scoiatael/archai/http" - "github.com/scoiatael/archai/simplejson" + "github.com/scoiatael/archai/types" ) // TODO: This should not be an action. Maybe introduce Job type? @@ -18,171 +16,47 @@ type HttpServer struct { Port int } -type BackgroundJob interface { - Run(c Context) +type HandlerContext struct { + Context } -func persistEvent(stream string, payload []byte, origin string, c Context) error { - var err error - for i := 0; i < c.Retries(); i += 1 { - action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)} - action.Meta["origin"] = origin - action.Meta["compressed"] = "false" - action.Meta["time"] = string(time.Now().Unix()) - err = action.Run(c) - if err == nil { - break - } - time.Sleep(c.Backoff(i)) - c.Telemetry().Incr("persist.retries", []string{"stream" + stream}) - log.Println("Retrying persistEvent, because of", err) - } - return err -} - -type WriteJob struct { - payload simplejson.Object - stream string +func (c HandlerContext) ReadEvents(stream string, cursor string, amount int) ([]types.Event, error) { + re := ReadEvents{Stream: stream, Cursor: cursor, Amount: amount} + err := re.Run(c) + return re.Events, errors.Wrap(err, "HandlerContext ReadEvents .Run") } -func (wj WriteJob) Run(c Context) { - payload, err := json.Marshal(wj.payload) - err = errors.Wrap(err, "HTTP server marshalling payload to write event") +func (c HandlerContext) ListStreams() ([]string, error) { + session, err := c.Persistence().Session() if err != nil { - c.HandleErr(err) - return - } - err = persistEvent(wj.stream, payload, "http; write_job", c) - if err != nil { - c.HandleErr(errors.Wrap(err, "HTTP server writing event")) - } else { - c.Telemetry().Incr("write", []string{"stream:" + wj.stream}) - } -} - -type BulkWriteJob struct { - schema []interface{} - objects []interface{} - stream string -} - -func makeObjectWithSchema(obj interface{}, schema []interface{}) (simplejson.Object, error) { - object_with_schema := make(simplejson.Object) - object, conv := obj.([]interface{}) - if !conv { - return object_with_schema, fmt.Errorf("Failed to convert obj to array") - } - for j, name := range schema { - name, conv := name.(string) - if !conv { - return object_with_schema, fmt.Errorf("%d: Failed to convert schema value to string", j) - } - if len(object) <= j { - return object_with_schema, fmt.Errorf("%d: Not enough values", j) - } - object_with_schema[name] = object[j] - } - return object_with_schema, nil -} - -func (wj BulkWriteJob) Run(c Context) { - c.Telemetry().Incr("bulk_write.aggregate", []string{"stream:" + wj.stream}) - for i, obj := range wj.objects { - object, err := makeObjectWithSchema(obj, wj.schema) - err = errors.Wrap(err, fmt.Sprintf("HTTP server splitting payload to bulk_write event at %d", i)) - if err != nil { - c.HandleErr(err) - return - } - payload, err := json.Marshal(object) - err = errors.Wrap(err, fmt.Sprintf("HTTP server marshalling payload to bulk_write event at %d", i)) - if err != nil { - c.HandleErr(err) - return - } - err = persistEvent(wj.stream, payload, "http; bulk_write_job", c) - if err != nil { - c.HandleErr(errors.Wrap(err, "HTTP server bulk_writing events")) - } else { - c.Telemetry().Incr("write", []string{"stream:" + wj.stream}) - } - } -} - -func writer(jobs <-chan BackgroundJob, c Context) { - for j := range jobs { - j.Run(c) + return []string{}, errors.Wrap(err, "HandlerContext ListStreams .Persistence.Session") } + return session.ListStreams() } func (hs HttpServer) Run(c Context) error { handler := c.HttpHandler() - jobs := make(chan BackgroundJob, 50) - for w := 0; w < c.Concurrency(); w++ { - go writer(jobs, c) - } - handler.Get("/_check", func(ctx http.GetContext) { - ctx.SendJson("OK") - }) - handler.Get("/streams", func(ctx http.GetContext) { - session, err := c.Persistence().Session() - err = errors.Wrap(err, "Obtaining session failed") - if err != nil { - ctx.ServerErr(err) - return - } - streams, err := session.ListStreams() - view := make(simplejson.Object) - view["streams"] = streams - ctx.SendJson(view) - }) - handler.Get("/stream/:id", func(ctx http.GetContext) { - stream := ctx.GetSegment("id") - action := ReadEvents{Stream: stream} - action.Cursor = ctx.StringParam("cursor") - action.Amount = ctx.IntParam("amount", 10) - err := errors.Wrap(action.Run(c), "HTTP server reading events") - if err != nil { - ctx.ServerErr(err) - return - } - root := make(simplejson.Object) - events := make(simplejson.ObjectArray, len(action.Events)) - for i, ev := range action.Events { - events[i] = make(simplejson.Object) - events[i]["ID"] = ev.ID - payload, err := simplejson.Read(ev.Blob) - err = errors.Wrap(err, "HTTP server marshalling response with read events") - if err != nil { - c.HandleErr(err) - ctx.ServerErr(err) - } - events[i]["blob"] = payload - } - root["results"] = events - c.Telemetry().Incr("read", []string{"stream:" + stream}) - ctx.SendJson(root) - }) + jobs := c.BackgroundJobs() + handler_context := handlers.Handler{HandlerContext{c}} + handler.Get("/_check", func(ctx http.GetContext) { ctx.SendJson("OK") }) + handler.Get("/streams", handler_context.GetStreams) + handler.Get("/stream/:id", handler_context.GetStream) handler.Post("/bulk/stream/:id", func(ctx http.PostContext) { var err error stream := ctx.GetSegment("id") - body, err := ctx.JsonBodyParams() + + job := BulkWriteJob{} + err = ctx.ReadJSON(&job) + if err != nil { - // Error was already sent - return - } - objects, conv := body["data"].([]interface{}) - if !conv { - ctx.ServerErr(fmt.Errorf("'data' field is not an Array (is %v)", reflect.TypeOf(body["data"]))) - return - } - schema, conv := body["schema"].([]interface{}) - if !conv { - ctx.ServerErr(fmt.Errorf("'schema' field is not an Array (is %v)", reflect.TypeOf(body["schema"]))) + ctx.ServerErr(fmt.Errorf("Expected body, encountered: %v", err)) return } - jobs <- BulkWriteJob{stream: stream, objects: objects, schema: schema} + job.Stream = stream + + jobs <- job + c.Telemetry().Gauge("jobs.len", []string{}, len(jobs)) ctx.SendJson("OK") }) handler.Post("/stream/:id", func(ctx http.PostContext) { @@ -190,11 +64,12 @@ func (hs HttpServer) Run(c Context) error { stream := ctx.GetSegment("id") body, err := ctx.JsonBodyParams() if err != nil { - // Error was already sent + ctx.ServerErr(fmt.Errorf("Expected body, encountered: %v", err)) return } - jobs <- WriteJob{stream: stream, payload: body} + jobs <- WriteJob{Stream: stream, Payload: body} + c.Telemetry().Gauge("jobs.len", []string{}, len(jobs)) ctx.SendJson("OK") }) diff --git a/actions/start_worker.go b/actions/start_worker.go new file mode 100644 index 0000000..476cb95 --- /dev/null +++ b/actions/start_worker.go @@ -0,0 +1,25 @@ +package actions + +type StartWorker struct { +} + +func writer(jobs <-chan Action, c Context) { + for j := range jobs { + err := j.Run(c) + if err != nil { + c.HandleErr(err) + } + } +} + +func (a StartWorker) Run(c Context) error { + jobs := c.BackgroundJobs() + for w := 0; w < c.Concurrency(); w++ { + go writer(jobs, c) + } + return nil +} + +func (a StartWorker) MarshalJSON() ([]byte, error) { + return []byte(`"Start workers"`), nil +} diff --git a/actions/write_event.go b/actions/write_event.go index 4b38d7a..8b211f2 100644 --- a/actions/write_event.go +++ b/actions/write_event.go @@ -2,6 +2,8 @@ package actions import ( "fmt" + "log" + "time" "github.com/pkg/errors" ) @@ -25,3 +27,21 @@ func (we WriteEvent) Run(c Context) error { func (we WriteEvent) MarshalJSON() ([]byte, error) { return []byte(`"Insert event to Cassandra stream"`), nil } + +func persistEvent(stream string, payload []byte, origin string, c Context) error { + var err error + for i := 0; i < c.Retries(); i += 1 { + action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)} + action.Meta["origin"] = origin + action.Meta["compressed"] = "false" + action.Meta["time"] = string(time.Now().Unix()) + err = action.Run(c) + if err == nil { + break + } + time.Sleep(c.Backoff(i)) + c.Telemetry().Incr("persist.retries", []string{"stream" + stream}) + log.Println("Retrying persistEvent, because of", err) + } + return err +} diff --git a/actions/write_job.go b/actions/write_job.go new file mode 100644 index 0000000..cfb255f --- /dev/null +++ b/actions/write_job.go @@ -0,0 +1,30 @@ +package actions + +import ( + "encoding/json" + + "github.com/pkg/errors" + "github.com/scoiatael/archai/simplejson" +) + +type WriteJob struct { + Payload simplejson.Object `json:"payload"` + Stream string `json:"stream"` +} + +func (wj WriteJob) Run(c Context) error { + payload, err := json.Marshal(wj.Payload) + if err != nil { + return errors.Wrap(err, "WriteJob jsonMarshal payload") + } + + if err := persistEvent(wj.Stream, payload, "http; write_job", c); err != nil { + return errors.Wrap(err, "WriteJob persistEvent") + } + c.Telemetry().Incr("write", []string{"stream:" + wj.Stream}) + return nil +} + +func (wj WriteJob) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]interface{}{"payload": wj.Payload, "stream": wj.Stream}) +} diff --git a/actions_test.go b/actions_test.go index 6598f3c..cbaea96 100644 --- a/actions_test.go +++ b/actions_test.go @@ -9,7 +9,6 @@ import ( "net/http" "time" - . "github.com/scoiatael/archai" "github.com/scoiatael/archai/actions" "github.com/scoiatael/archai/simplejson" "github.com/scoiatael/archai/util" @@ -18,27 +17,6 @@ import ( . "github.com/onsi/gomega" ) -const testingKeyspace = "archai_test" - -var ( - config Config -) - -var _ = BeforeSuite(func() { - config = Config{} - config.Hosts = []string{"127.0.0.1"} - config.Keyspace = testingKeyspace - config.StatsdAddr = "dd-agent.service.consul:8125" - err := config.Init() - if err != nil { - panic(err) - } -}) - -var _ = AfterSuite(func() { - -}) - var _ = Describe("Actions", func() { Describe("HttpServer", func() { var ( @@ -79,7 +57,8 @@ var _ = Describe("Actions", func() { Expect(err).NotTo(HaveOccurred()) Expect(string(body)).To(Equal(`"OK"`)) - time.Sleep(20 * time.Millisecond) + write := <-config.BackgroundJobs() + write.Run(config) action := actions.ReadEvents{} action.Amount = 5 @@ -102,7 +81,7 @@ var _ = Describe("Actions", func() { JustBeforeEach(func() { stream = util.RandomString(10) address = fmt.Sprintf("%s/stream/%s", address, stream) - buf = bytes.NewBufferString(`{ "foo": "bar" }`) + buf = bytes.NewBufferString(`{ "foo": "bar", "baz": 2 }`) }) It("allows writing events", func() { @@ -112,6 +91,22 @@ var _ = Describe("Actions", func() { body, err := ioutil.ReadAll(resp.Body) Expect(err).NotTo(HaveOccurred()) Expect(string(body)).To(Equal(`"OK"`)) + + write := <-config.BackgroundJobs() + write.Run(config) + + action := actions.ReadEvents{} + action.Amount = 5 + action.Stream = stream + err = action.Run(config) + Expect(err).NotTo(HaveOccurred()) + Expect(action.Events).NotTo(BeEmpty()) + Expect(action.Events).To(HaveLen(1)) + + js, err := simplejson.Read(action.Events[0].Blob) + Expect(err).NotTo(HaveOccurred()) + Expect(js["foo"]).To(Equal("bar")) + Expect(js["baz"]).To(Equal(2.0)) }) Context("After some event was written", func() { @@ -125,7 +120,7 @@ var _ = Describe("Actions", func() { }) - get := func(query string) interface{} { + get := func(query string) map[string]interface{} { resp, err := http.Get(address + query) Expect(err).NotTo(HaveOccurred()) @@ -134,18 +129,18 @@ var _ = Describe("Actions", func() { js := make(map[string]interface{}) err = json.Unmarshal(body, &js) Expect(err).NotTo(HaveOccurred()) - return js["results"] + return js } It("allows reading events", func() { - results := get("") + results := get("")["results"] Expect(results).NotTo(BeEmpty()) Expect(results).To(HaveLen(1)) }) It("allows reading events with cursor", func() { - cursor := get("").([]interface{})[0].(map[string]interface{})["ID"].(string) + cursor := get("")["cursor"].(map[string]interface{})["next"].(string) - results := get("?cursor=" + cursor) + results := get("?cursor=" + cursor)["results"] Expect(results).To(BeEmpty()) }) }) diff --git a/archai_suite_test.go b/archai_suite_test.go index 590f0c0..8e0c245 100644 --- a/archai_suite_test.go +++ b/archai_suite_test.go @@ -5,8 +5,34 @@ import ( . "github.com/onsi/gomega" "testing" + + . "github.com/scoiatael/archai" +) + +const testingKeyspace = "archai_test" + +var ( + config Config ) +var _ = BeforeSuite(func() { + config = Config{} + config.Features = make(map[string]bool) + config.Hosts = []string{"127.0.0.1"} + // NOTE: makes it far easier to spot panics, but throws a log of noise otherwise + // config.Features["dev_logger"] = true + config.Keyspace = testingKeyspace + config.StatsdAddr = "dd-agent.service.consul:8125" + err := config.Init() + if err != nil { + panic(err) + } +}) + +var _ = AfterSuite(func() { + +}) + func TestArchai(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Archai Suite") diff --git a/config.go b/config.go index 208fa5e..16e80c6 100644 --- a/config.go +++ b/config.go @@ -25,6 +25,7 @@ type Config struct { provider persistence.Provider telemetry telemetry.Datadog + jobs chan actions.Action initialized bool } @@ -81,6 +82,8 @@ func (c *Config) Init() error { dd := telemetry.NewDatadog(c.StatsdAddr, "archai", c.Keyspace) c.telemetry = dd + c.jobs = make(chan actions.Action, 50) + c.initialized = true return nil } @@ -113,3 +116,7 @@ func (c Config) Retries() int { func (c Config) Backoff(attempt int) time.Duration { return time.Duration(math.Pow10(attempt)) * time.Millisecond } + +func (c Config) BackgroundJobs() chan actions.Action { + return c.jobs +} diff --git a/http/context.go b/http/context.go index 347802c..e8b49a9 100644 --- a/http/context.go +++ b/http/context.go @@ -23,4 +23,5 @@ type GetContext interface { type PostContext interface { HttpContext JsonBodyParams() (simplejson.Object, error) + ReadJSON(interface{}) error } diff --git a/http/fast_http.go b/http/fast_http.go deleted file mode 100644 index 42214fb..0000000 --- a/http/fast_http.go +++ /dev/null @@ -1,118 +0,0 @@ -package http - -import ( - "encoding/json" - "fmt" - "strconv" - "strings" - - "github.com/pkg/errors" - "github.com/valyala/fasthttp" - - "github.com/scoiatael/archai/simplejson" -) - -type FastHttpContext struct { - *fasthttp.RequestCtx - Context Context -} - -func (hc FastHttpContext) ServerErr(err error) { - hc.Context.HandleErr(err) - hc.Error(fmt.Sprintf(`{ "error": "%v" }`, err), fasthttp.StatusInternalServerError) -} - -func (hc FastHttpContext) SendJson(response interface{}) { - dump, err := json.Marshal(response) - if err != nil { - hc.ServerErr(err) - } else { - hc.SetBody(dump) - } -} - -// TODO: Do this normal way -func (hc FastHttpContext) GetSegment(index string) string { - segments := strings.Split(string(hc.Path()), "/") - if len(segments) == 0 { - return "" - } else { - return segments[len(segments)-1] - } -} - -type FastHttpGetContext struct { - FastHttpContext -} - -func (gc FastHttpGetContext) StringParam(name string) string { - val := gc.QueryArgs().Peek(name) - return string(val) -} - -func (gc FastHttpGetContext) IntParam(name string, def int) int { - val := gc.QueryArgs().Peek(name) - i, err := strconv.Atoi(string(val)) - if err != nil { - return def - } - return i -} - -type FastHttpPostContext struct { - FastHttpContext -} - -const expectedJSON = `{ "error": "expected JSON body" }` - -func (pc FastHttpPostContext) JsonBodyParams() (simplejson.Object, error) { - body := pc.PostBody() - read := make(simplejson.Object) - err := json.Unmarshal(body, &read) - if err != nil { - pc.Error(expectedJSON, fasthttp.StatusBadRequest) - } - return read, err -} - -// TODO: Add routing ;) -type FastHttpHandlers struct { - POST func(PostContext) - GET func(GetContext) -} - -type FastHttpHandler struct { - handlers FastHttpHandlers - Context Context -} - -func (h *FastHttpHandler) Get(path string, handler func(GetContext)) { - h.handlers.GET = handler -} - -func (h *FastHttpHandler) Post(path string, handler func(PostContext)) { - h.handlers.POST = handler -} - -const ( - methodNotAllowed = `{ "error": "method not allowed" }` - contentType = `application/json` -) - -func (h *FastHttpHandler) compile() func(*fasthttp.RequestCtx) { - return func(ctx *fasthttp.RequestCtx) { - ctx.SetContentType(contentType) - httpCtx := FastHttpContext{ctx, h.Context} - if ctx.IsPost() { - h.handlers.POST(FastHttpPostContext{httpCtx}) - } else if ctx.IsGet() { - h.handlers.GET(FastHttpGetContext{httpCtx}) - } else { - ctx.Error(methodNotAllowed, fasthttp.StatusMethodNotAllowed) - } - } -} - -func (h *FastHttpHandler) Run(addr string) error { - return errors.Wrap(fasthttp.ListenAndServe(addr, h.compile()), "Starting fasthttp server") -} diff --git a/http/new.go b/http/new.go index 56c76ec..8bac543 100644 --- a/http/new.go +++ b/http/new.go @@ -24,7 +24,3 @@ func NewIris(c Context, useDevLogger bool) *IrisHandler { handler.framework = app return &handler } - -func NewFastHttp(c Context) *FastHttpHandler { - return &FastHttpHandler{Context: c} -} diff --git a/main.go b/main.go index 5fa7c4c..e6b6647 100644 --- a/main.go +++ b/main.go @@ -68,6 +68,7 @@ func main() { if c.Bool("migrate") { config.Append(actions.Migrate{}) } + config.Append(actions.StartWorker{}) if c.Bool("list-streams") { config.Append(actions.ListStreams{}) } diff --git a/persistence/persistence_test.go b/persistence/persistence_test.go index 3543453..b04b389 100644 --- a/persistence/persistence_test.go +++ b/persistence/persistence_test.go @@ -66,8 +66,10 @@ var _ = BeforeSuite(func() { }) var _ = AfterSuite(func() { - root_sess.Query(fmt.Sprintf("truncate table %s.events", testingKeyspace)).Exec() - root_sess.Close() + if root_sess != nil { + root_sess.Query(fmt.Sprintf("truncate table %s.events", testingKeyspace)).Exec() + root_sess.Close() + } }) var _ = Describe("Persistence", func() { diff --git a/telemetry/datadog.go b/telemetry/datadog.go index 68333cb..7518e33 100644 --- a/telemetry/datadog.go +++ b/telemetry/datadog.go @@ -10,6 +10,7 @@ import ( type Datadog interface { Incr(name string, tags []string) Failure(title, text string) + Gauge(name string, tags []string, value int) } type Client struct { @@ -42,6 +43,15 @@ func (c *Client) Incr(name string, tags []string) { } } +func (c *Client) Gauge(name string, tags []string, value int) { + if c.initialized { + err := c.client.Gauge(name, float64(value), tags, 1.0) + if err != nil { + c.on_error(err) + } + } +} + func NewDatadog(addr string, namespace string, keyspace string) Datadog { c, err := statsd.New(addr) if err != nil {