Skip to content
This repository has been archived by the owner on May 7, 2023. It is now read-only.

Commit

Permalink
Retry w/ backoff on persistence write fail
Browse files Browse the repository at this point in the history
  • Loading branch information
scoiatael committed Mar 10, 2017
1 parent 9e6fa30 commit 96398fd
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 25 deletions.
3 changes: 3 additions & 0 deletions actions/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package actions
import (
"context"
"encoding/json"
"time"

"github.com/scoiatael/archai/http"
"github.com/scoiatael/archai/persistence"
Expand All @@ -24,6 +25,8 @@ type Context interface {
HttpHandler() HttpHandler
Telemetry() telemetry.Datadog
Concurrency() int
Retries() int
Backoff(int) time.Duration
}

type Action interface {
Expand Down
57 changes: 33 additions & 24 deletions actions/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package actions
import (
"encoding/json"
"fmt"
"log"
"reflect"
"time"

Expand All @@ -22,11 +23,21 @@ type BackgroundJob interface {
}

func persistEvent(stream string, payload []byte, origin string, c Context) error {
action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = origin
action.Meta["compressed"] = "false"
action.Meta["time"] = string(time.Now().Unix())
return action.Run(c)
var err error
for i := 0; i < c.Retries(); i += 1 {
action := WriteEvent{Stream: stream, Payload: payload, Meta: make(map[string]string)}
action.Meta["origin"] = origin
action.Meta["compressed"] = "false"
action.Meta["time"] = string(time.Now().Unix())
err = action.Run(c)
if err == nil {
break
}
time.Sleep(c.Backoff(i))
c.Telemetry().Incr("persist.retries", []string{"stream" + stream})
log.Println("Retrying persistEvent, because of", err)
}
return err
}

type WriteJob struct {
Expand Down Expand Up @@ -93,7 +104,7 @@ func (wj BulkWriteJob) Run(c Context) {
if err != nil {
c.HandleErr(errors.Wrap(err, "HTTP server bulk_writing events"))
} else {
c.Telemetry().Incr("bulk_write.singular", []string{"stream:" + wj.stream})
c.Telemetry().Incr("write", []string{"stream:" + wj.stream})
}
}
}
Expand All @@ -117,7 +128,6 @@ func (hs HttpServer) Run(c Context) error {
session, err := c.Persistence().Session()
err = errors.Wrap(err, "Obtaining session failed")
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
return
}
Expand All @@ -133,26 +143,25 @@ func (hs HttpServer) Run(c Context) error {
action.Amount = ctx.IntParam("amount", 10)
err := errors.Wrap(action.Run(c), "HTTP server reading events")
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
} else {
root := make(simplejson.Object)
events := make(simplejson.ObjectArray, len(action.Events))
for i, ev := range action.Events {
events[i] = make(simplejson.Object)
events[i]["ID"] = ev.ID
payload, err := simplejson.Read(ev.Blob)
err = errors.Wrap(err, "HTTP server marshalling response with read events")
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
}
events[i]["blob"] = payload
return
}
root := make(simplejson.Object)
events := make(simplejson.ObjectArray, len(action.Events))
for i, ev := range action.Events {
events[i] = make(simplejson.Object)
events[i]["ID"] = ev.ID
payload, err := simplejson.Read(ev.Blob)
err = errors.Wrap(err, "HTTP server marshalling response with read events")
if err != nil {
c.HandleErr(err)
ctx.ServerErr(err)
}
root["results"] = events
c.Telemetry().Incr("read", []string{"stream:" + stream})
ctx.SendJson(root)
events[i]["blob"] = payload
}
root["results"] = events
c.Telemetry().Incr("read", []string{"stream:" + stream})
ctx.SendJson(root)
})
handler.Post("/bulk/stream/:id", func(ctx http.PostContext) {
var err error
Expand Down
2 changes: 1 addition & 1 deletion actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("Actions", func() {
Expect(err).NotTo(HaveOccurred())
Expect(string(body)).To(Equal(`"OK"`))

time.Sleep(10 * time.Millisecond)
time.Sleep(20 * time.Millisecond)

action := actions.ReadEvents{}
action.Amount = 5
Expand Down
10 changes: 10 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package main
import (
"fmt"
"log"
"math"
"runtime"
"time"

"github.com/scoiatael/archai/actions"
"github.com/scoiatael/archai/http"
Expand Down Expand Up @@ -103,3 +105,11 @@ func (c Config) PrettyPrint() {
func (c Config) Concurrency() int {
return runtime.NumCPU()
}

func (c Config) Retries() int {
return 3
}

func (c Config) Backoff(attempt int) time.Duration {
return time.Duration(math.Pow10(attempt)) * time.Millisecond
}

0 comments on commit 96398fd

Please sign in to comment.