From 96398fd608456c5170c242e95b39495a05e3554e Mon Sep 17 00:00:00 2001 From: scoiatael Date: Fri, 10 Mar 2017 15:35:49 +0100 Subject: [PATCH] Retry w/ backoff on persistence write fail --- actions/context.go | 3 +++ actions/http_server.go | 57 ++++++++++++++++++++++++------------------ actions_test.go | 2 +- config.go | 10 ++++++++ 4 files changed, 47 insertions(+), 25 deletions(-) diff --git a/actions/context.go b/actions/context.go index 2d6f2bc..1e85fd7 100644 --- a/actions/context.go +++ b/actions/context.go @@ -3,6 +3,7 @@ package actions import ( "context" "encoding/json" + "time" "github.com/scoiatael/archai/http" "github.com/scoiatael/archai/persistence" @@ -24,6 +25,8 @@ type Context interface { HttpHandler() HttpHandler Telemetry() telemetry.Datadog Concurrency() int + Retries() int + Backoff(int) time.Duration } type Action interface { diff --git a/actions/http_server.go b/actions/http_server.go index cc608ee..5249b40 100644 --- a/actions/http_server.go +++ b/actions/http_server.go @@ -3,6 +3,7 @@ package actions import ( "encoding/json" "fmt" + "log" "reflect" "time" @@ -22,11 +23,21 @@ type BackgroundJob interface { } func persistEvent(stream string, payload []byte, origin string, c Context) error { - action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)} - action.Meta["origin"] = origin - action.Meta["compressed"] = "false" - action.Meta["time"] = string(time.Now().Unix()) - return action.Run(c) + var err error + for i := 0; i < c.Retries(); i += 1 { + action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)} + action.Meta["origin"] = origin + action.Meta["compressed"] = "false" + action.Meta["time"] = string(time.Now().Unix()) + err = action.Run(c) + if err == nil { + break + } + time.Sleep(c.Backoff(i)) + c.Telemetry().Incr("persist.retries", []string{"stream" + stream}) + log.Println("Retrying persistEvent, because of", err) + } + return err } type WriteJob struct { @@ -93,7 +104,7 @@ func (wj BulkWriteJob) Run(c Context) { if err != nil { c.HandleErr(errors.Wrap(err, "HTTP server bulk_writing events")) } else { - c.Telemetry().Incr("bulk_write.singular", []string{"stream:" + wj.stream}) + c.Telemetry().Incr("write", []string{"stream:" + wj.stream}) } } } @@ -117,7 +128,6 @@ func (hs HttpServer) Run(c Context) error { session, err := c.Persistence().Session() err = errors.Wrap(err, "Obtaining session failed") if err != nil { - c.HandleErr(err) ctx.ServerErr(err) return } @@ -133,26 +143,25 @@ func (hs HttpServer) Run(c Context) error { action.Amount = ctx.IntParam("amount", 10) err := errors.Wrap(action.Run(c), "HTTP server reading events") if err != nil { - c.HandleErr(err) ctx.ServerErr(err) - } else { - root := make(simplejson.Object) - events := make(simplejson.ObjectArray, len(action.Events)) - for i, ev := range action.Events { - events[i] = make(simplejson.Object) - events[i]["ID"] = ev.ID - payload, err := simplejson.Read(ev.Blob) - err = errors.Wrap(err, "HTTP server marshalling response with read events") - if err != nil { - c.HandleErr(err) - ctx.ServerErr(err) - } - events[i]["blob"] = payload + return + } + root := make(simplejson.Object) + events := make(simplejson.ObjectArray, len(action.Events)) + for i, ev := range action.Events { + events[i] = make(simplejson.Object) + events[i]["ID"] = ev.ID + payload, err := simplejson.Read(ev.Blob) + err = errors.Wrap(err, "HTTP server marshalling response with read events") + if err != nil { + c.HandleErr(err) + ctx.ServerErr(err) } - root["results"] = events - c.Telemetry().Incr("read", []string{"stream:" + stream}) - ctx.SendJson(root) + events[i]["blob"] = payload } + root["results"] = events + c.Telemetry().Incr("read", []string{"stream:" + stream}) + ctx.SendJson(root) }) handler.Post("/bulk/stream/:id", func(ctx http.PostContext) { var err error diff --git a/actions_test.go b/actions_test.go index a8d9048..6598f3c 100644 --- a/actions_test.go +++ b/actions_test.go @@ -79,7 +79,7 @@ var _ = Describe("Actions", func() { Expect(err).NotTo(HaveOccurred()) Expect(string(body)).To(Equal(`"OK"`)) - time.Sleep(10 * time.Millisecond) + time.Sleep(20 * time.Millisecond) action := actions.ReadEvents{} action.Amount = 5 diff --git a/config.go b/config.go index d3085ce..208fa5e 100644 --- a/config.go +++ b/config.go @@ -3,7 +3,9 @@ package main import ( "fmt" "log" + "math" "runtime" + "time" "github.com/scoiatael/archai/actions" "github.com/scoiatael/archai/http" @@ -103,3 +105,11 @@ func (c Config) PrettyPrint() { func (c Config) Concurrency() int { return runtime.NumCPU() } + +func (c Config) Retries() int { + return 3 +} + +func (c Config) Backoff(attempt int) time.Duration { + return time.Duration(math.Pow10(attempt)) * time.Millisecond +}