Skip to content

Commit

Permalink
Implement a more advanced client.
Browse files Browse the repository at this point in the history
Make client aware of the state of all cluster nodes.
Add topology discovery requests.
Add healthcheks.
Add primary/secondary request routing.
Add round-robin endpoint selector for reads.
Add pluggable request retriers.
Add pluggable backoff retrying policies.
Add pluggable read preference option.
Refactor client configuration.
  • Loading branch information
aalda committed Mar 15, 2019
1 parent 166c172 commit 3f2cfbe
Show file tree
Hide file tree
Showing 23 changed files with 2,157 additions and 205 deletions.
28 changes: 10 additions & 18 deletions api/apihttp/apihttp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package apihttp

import (
"bytes"
"encoding/json"
"net/http"
"time"
Expand All @@ -37,33 +36,26 @@ type HealthCheckResponse struct {

// HealthCheckHandler checks the system status and returns it accordinly.
// The http call it answer is:
// GET /health-check
// HEAD /
//
// The following statuses are expected:
//
// If everything is alright, the HTTP status is 200 and the body contains:
// {"version": "0", "status":"ok"}
// If everything is alright, the HTTP response will have a 204 status code
// and no body.
func HealthCheckHandler(w http.ResponseWriter, r *http.Request) {

metrics.QedAPIHealthcheckRequestsTotal.Inc()

result := HealthCheckResponse{
Version: 0,
Status: "ok",
// Make sure we can only be called with an HTTP POST request.
if r.Method != "HEAD" {
w.Header().Set("Allow", "HEAD")
w.WriteHeader(http.StatusMethodNotAllowed)
return
}

resultJson, _ := json.Marshal(result)

// A very simple health check.
w.WriteHeader(http.StatusOK)
w.Header().Set("Content-Type", "application/json")

// In the future we could report back on the status of our DB, or our cache
// (e.g. Redis) by performing a simple PING, and include them in the response.
out := new(bytes.Buffer)
_ = json.Compact(out, resultJson)
w.WriteHeader(http.StatusNoContent)

_, _ = w.Write(out.Bytes())
}

// Add posts an event into the system:
Expand Down Expand Up @@ -304,7 +296,7 @@ func AuthHandlerMiddleware(handler http.HandlerFunc) http.HandlerFunc {
func NewApiHttp(balloon raftwal.RaftBalloonApi) *http.ServeMux {

api := http.NewServeMux()
api.HandleFunc("/health-check", AuthHandlerMiddleware(HealthCheckHandler))
api.HandleFunc("/healthcheck", AuthHandlerMiddleware(HealthCheckHandler))
api.HandleFunc("/events", AuthHandlerMiddleware(Add(balloon)))
api.HandleFunc("/proofs/membership", AuthHandlerMiddleware(Membership(balloon)))
api.HandleFunc("/proofs/digest-membership", AuthHandlerMiddleware(DigestMembership(balloon)))
Expand Down
11 changes: 5 additions & 6 deletions api/apihttp/apihttp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (b fakeRaftBalloon) Info() map[string]interface{} {
func TestHealthCheckHandler(t *testing.T) {
// Create a request to pass to our handler. We don't have any query parameters for now, so we'll
// pass 'nil' as the third parameter.
req, err := http.NewRequest("GET", "/health-check", nil)
req, err := http.NewRequest("HEAD", "/healthcheck", nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -111,16 +111,15 @@ func TestHealthCheckHandler(t *testing.T) {
handler.ServeHTTP(rr, req)

// Check the status code is what we expect.
if status := rr.Code; status != http.StatusOK {
if status := rr.Code; status != http.StatusNoContent {
t.Errorf("handler returned wrong status code: got %v want %v",
status, http.StatusOK)
status, http.StatusNoContent)
}

// Check the response body is what we expect.
expected := `{"version":0,"status":"ok"}`
if rr.Body.String() != expected {
if rr.Body.String() != "" {
t.Errorf("handler returned unexpected body: got %v want %v",
rr.Body.String(), expected)
rr.Body.String(), "")
}
}

Expand Down
120 changes: 120 additions & 0 deletions client/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"math"
"math/rand"
"sync"
"time"
)

// BackoffF specifies the signature of a function that returns the
// time to wait before the next call to a resource. To stop retrying
// return false in the 2nd return value.
type BackoffF func(attempt int) (time.Duration, bool)

// Backoff allows callers to implement their own Backoff strategy.
type Backoff interface {
// Next implements a BackoffF.
Next(attempt int) (time.Duration, bool)
}

// StopBackoff is a fixed backoff policy that always returns false for
// Next(), meaning that the operation should never be retried.
type StopBackoff struct{}

// NewStopBackoff returns a new StopBackoff.
func NewStopBackoff() *StopBackoff {
return &StopBackoff{}
}

// Next implements BackoffF for StopBackoff.
func (b StopBackoff) Next(attempt int) (time.Duration, bool) {
return 0, false
}

// ConstantBackoff is a backoff policy that always returns the same delay.
type ConstantBackoff struct {
interval time.Duration
}

// NewConstantBackoff returns a new ConstantBackoff.
func NewConstantBackoff(interval time.Duration) *ConstantBackoff {
return &ConstantBackoff{interval: interval}
}

// Next implements BackoffF for ConstantBackoff.
func (b *ConstantBackoff) Next(attempt int) (time.Duration, bool) {
return b.interval, true
}

// SimpleBackoff takes a list of fixed values for backoff intervals.
// Each call to Next returns the next value from that fixed list.
// After each value is returned, subsequent calls to Next will only return
// the last element.
type SimpleBackoff struct {
sync.Mutex
ticks []int
}

// NewSimpleBackoff creates a SimpleBackoff algorithm with the specified
// list of fixed intervals in milliseconds.
func NewSimpleBackoff(ticks ...int) *SimpleBackoff {
return &SimpleBackoff{ticks: ticks}
}

// Next implements BackoffF for SimpleBackoff.
func (b *SimpleBackoff) Next(attempt int) (time.Duration, bool) {
b.Lock()
defer b.Unlock()
if attempt >= len(b.ticks) {
return 0, false
}
ms := b.ticks[attempt]
return time.Duration(ms) * time.Millisecond, true
}

// ExponentialBackoff implements the simple exponential backoff described by
// Douglas Thain at http://dthain.blogspot.de/2009/02/exponential-backoff-in-distributed.html.
type ExponentialBackoff struct {
t float64 // initial timeout (in msec)
f float64 // exponential factor (e.g. 2)
m float64 // maximum timeout (in msec)
}

// NewExponentialBackoff returns a ExponentialBackoff backoff policy.
// Use initialTimeout to set the first/minimal interval
// and maxTimeout to set the maximum wait interval.
func NewExponentialBackoff(initialTimeout, maxTimeout time.Duration) *ExponentialBackoff {
return &ExponentialBackoff{
t: float64(int64(initialTimeout / time.Millisecond)),
f: 2.0,
m: float64(int64(maxTimeout / time.Millisecond)),
}
}

// Next implements BackoffF for ExponentialBackoff.
func (b *ExponentialBackoff) Next(attempt int) (time.Duration, bool) {
r := 1.0 + rand.Float64() // random number in [1..2]
m := math.Min(r*b.t*math.Pow(b.f, float64(attempt)), b.m)
if m >= b.m {
return 0, false
}
d := time.Duration(int64(m)) * time.Millisecond
return d, true
}
112 changes: 112 additions & 0 deletions client/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2018 Banco Bilbao Vizcaya Argentaria, S.A.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"math/rand"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestStopBackoff(t *testing.T) {
b := NewStopBackoff()
_, ok := b.Next(0)
require.False(t, ok)
}

func TestConstantBackoff(t *testing.T) {
b := NewConstantBackoff(time.Second)
d, ok := b.Next(0)
require.True(t, ok)
require.Equal(t, time.Second, d)
}

func TestSimpleBackoff(t *testing.T) {

testCases := []struct {
Duration time.Duration
Continue bool
}{
{
Duration: 1 * time.Millisecond,
Continue: true,
},
{
Duration: 2 * time.Millisecond,
Continue: true,
},
{
Duration: 7 * time.Millisecond,
Continue: true,
},
{
Duration: 0,
Continue: false,
},
{
Duration: 0,
Continue: false,
},
}

b := NewSimpleBackoff(1, 2, 7)

for i, c := range testCases {
d, ok := b.Next(i)
require.Equalf(t, c.Continue, ok, "The continue value should match for test case %d", i)
require.Equalf(t, c.Duration, d, "The duration value should match for test case %d", i)
}
}

func TestExponentialBackoff(t *testing.T) {

rand.Seed(time.Now().UnixNano())

min := time.Duration(8) * time.Millisecond
max := time.Duration(256) * time.Millisecond
b := NewExponentialBackoff(min, max)

between := func(value time.Duration, a, b int) bool {
x := int(value / time.Millisecond)
return a <= x && x <= b
}

d, ok := b.Next(0)
require.True(t, ok)
require.True(t, between(d, 8, 256))

d, ok = b.Next(1)
require.True(t, ok)
require.True(t, between(d, 8, 256))

d, ok = b.Next(3)
require.True(t, ok)
require.True(t, between(d, 8, 256))

d, ok = b.Next(4)
require.True(t, ok)
require.True(t, between(d, 8, 256))

_, ok = b.Next(5)
require.False(t, ok)

_, ok = b.Next(6)
require.False(t, ok)

}
Loading

0 comments on commit 3f2cfbe

Please sign in to comment.