Skip to content

Commit

Permalink
Add retries when checking topology.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jose Luis Lucas authored and iknite committed Feb 21, 2019
1 parent c7b019b commit e8a4a1c
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 104 deletions.
95 changes: 50 additions & 45 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,18 @@ func NewHTTPClient(conf Config) *HTTPClient {
TLSHandshakeTimeout: 5 * time.Second,
},
},
Topology{},
Topology{
Leader: conf.Endpoints[0],
Endpoints: conf.Endpoints,
},
}

// Initial topology assignment
client.topology.Leader = conf.Endpoints[0]
client.topology.Endpoints = conf.Endpoints

info, err := client.getClusterInfo()
if err != nil {
log.Errorf("Failed to get raft cluster info: %v", err)
log.Errorf("Failed to get raft cluster info. Error: %v", err)
return nil
}

client.updateTopology(info)

return client
Expand All @@ -87,25 +87,11 @@ func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, erro
var retries uint

for {
url, err := url.Parse(c.topology.Leader + path)
if err != nil {
return nil, err //panic(err)
}

req, err := http.NewRequest(method, fmt.Sprintf("%s", url), bytes.NewBuffer(data))
if err != nil {
return nil, err //panic(err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.conf.APIKey)

resp, err := c.Do(req)
if err != nil {
if retries == 5 {
return nil, err
}
c.topology.Leader = c.topology.Endpoints[rand.Intn(len(c.topology.Endpoints))]
retries = retries + 1
delay := time.Duration(10 << retries * time.Millisecond)
time.Sleep(delay)
Expand All @@ -116,19 +102,32 @@ func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, erro
}

func (c HTTPClient) getClusterInfo() (map[string]interface{}, error) {
var retries uint
info := make(map[string]interface{})

body, err := c.doReq("GET", "/info/shards", []byte{})
if err != nil {
return info, err
}
for {
body, err := c.doReq("GET", "/info/shards", []byte{})

if err != nil {
log.Debugf("Failed to get raft cluster info through server %s. Error: %v",
c.topology.Leader, err)
if retries == 5 {
return nil, err
}
c.topology.Leader = c.topology.Endpoints[rand.Intn(len(c.topology.Endpoints))]
retries = retries + 1
delay := time.Duration(10 << retries * time.Millisecond)
time.Sleep(delay)
continue
}

err = json.Unmarshal(body, &info)
if err != nil {
return nil, err
}

err = json.Unmarshal(body, &info)
if err != nil {
return info, err
}

return info, nil
}

func (c *HTTPClient) updateTopology(info map[string]interface{}) {
Expand Down Expand Up @@ -161,16 +160,31 @@ func (c *HTTPClient) updateTopology(info map[string]interface{}) {

func (c *HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {

resp, err := c.exponentialBackoff(method, path, data)
url, err := url.Parse(c.topology.Leader + path)
if err != nil {
return nil, err //panic(err)
}

req, err := http.NewRequest(method, fmt.Sprintf("%s", url), bytes.NewBuffer(data))
if err != nil {
return nil, err //panic(err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.conf.APIKey)

resp, err := c.exponentialBackoff(req)
if err != nil {
return nil, err
// NetworkTransport error. Check topology info
}
defer resp.Body.Close()

bodyBytes, _ := ioutil.ReadAll(resp.Body)

if resp.StatusCode >= 500 {
return nil, fmt.Errorf("Server error: %v", string(bodyBytes))
// Non Leader error. Check topology info.
}

if resp.StatusCode >= 400 && resp.StatusCode < 500 {
Expand All @@ -180,23 +194,14 @@ func (c *HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
return bodyBytes, nil
}

func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, error) {

var retries uint

for {
resp, err := c.Do(req)
if err != nil {
if retries == 5 {
return nil, err
}
retries = retries + 1
delay := time.Duration(10 << retries * time.Millisecond)
time.Sleep(delay)
continue
}
return resp, err
// Ping will do a request to the server
func (c HTTPClient) Ping() error {
_, err := c.doReq("GET", "/health-check", nil)
if err != nil {
return err
}

return nil
}

// Add will do a request to the server with a post data to store a new event.
Expand Down
102 changes: 52 additions & 50 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,35 +30,34 @@ import (
"github.com/stretchr/testify/assert"
)

var (
client *HTTPClient
mux *http.ServeMux
server *httptest.Server
)

func init() {
log.SetLogger("client-test", "info")
}

func setup() func() {
mux = http.NewServeMux()
server = httptest.NewServer(mux)
func setupServer(input []byte) (string, func()) {
mux := http.NewServeMux()
server := httptest.NewServer(mux)

mux.HandleFunc("/info/shards", infoHandler(server.URL))
mux.HandleFunc("/events", defaultHandler(input))
mux.HandleFunc("/proofs/membership", defaultHandler(input))
mux.HandleFunc("/proofs/incremental", defaultHandler(input))
mux.HandleFunc("/proofs/digest-membership", defaultHandler(input))

return server.URL, func() {
server.Close()
}
}

client = NewHTTPClient(Config{
Endpoints: []string{server.URL},
func setupClient(urls []string) *HTTPClient {
return NewHTTPClient(Config{
Endpoints: urls,
APIKey: "my-awesome-api-key",
Insecure: false,
})
return func() {
server.Close()
}
}

func TestAddSuccess(t *testing.T) {
tearDown := setup()
defer tearDown()

event := "Hello world!"
snap := &protocol.Snapshot{
Expand All @@ -67,32 +66,28 @@ func TestAddSuccess(t *testing.T) {
Version: 0,
EventDigest: []byte(event),
}
input, _ := json.Marshal(snap)

result, _ := json.Marshal(snap)
mux.HandleFunc("/events", okHandler(result))
serverURL, tearDown := setupServer(input)
defer tearDown()
client := setupClient([]string{serverURL})

snapshot, err := client.Add(event)
assert.NoError(t, err)
assert.Equal(t, snap, snapshot, "The snapshots should match")

}

func TestAddWithServerFailure(t *testing.T) {
tearDown := setup()
serverURL, tearDown := setupServer(nil)
defer tearDown()
client := setupClient([]string{serverURL})

event := "Hello world!"
mux.HandleFunc("/events", serverErrorHandler())

_, err := client.Add(event)
assert.Error(t, err)

}

func TestMembership(t *testing.T) {
tearDown := setup()
defer tearDown()

event := "Hello world!"
version := uint64(0)
fakeResult := &protocol.MembershipResult{
Expand All @@ -105,17 +100,18 @@ func TestMembership(t *testing.T) {
QueryVersion: version,
ActualVersion: version,
}
resultJSON, _ := json.Marshal(fakeResult)
mux.HandleFunc("/proofs/membership", okHandler(resultJSON))
inputJSON, _ := json.Marshal(fakeResult)

serverURL, tearDown := setupServer(inputJSON)
defer tearDown()
client := setupClient([]string{serverURL})

result, err := client.Membership([]byte(event), version)
assert.NoError(t, err)
assert.Equal(t, fakeResult, result, "The results should match")
assert.Equal(t, fakeResult, result, "The inputs should match")
}

func TestDigestMembership(t *testing.T) {
tearDown := setup()
defer tearDown()

event := "Hello world!"
version := uint64(0)
Expand All @@ -129,28 +125,29 @@ func TestDigestMembership(t *testing.T) {
QueryVersion: version,
ActualVersion: version,
}
resultJSON, _ := json.Marshal(fakeResult)
mux.HandleFunc("/proofs/digest-membership", okHandler(resultJSON))
inputJSON, _ := json.Marshal(fakeResult)

serverURL, tearDown := setupServer(inputJSON)
defer tearDown()
client := setupClient([]string{serverURL})

result, err := client.MembershipDigest([]byte("digest"), version)
assert.NoError(t, err)
assert.Equal(t, fakeResult, result, "The results should match")
}

func TestMembershipWithServerFailure(t *testing.T) {
tearDown := setup()
serverURL, tearDown := setupServer(nil)
defer tearDown()
client := setupClient([]string{serverURL})

event := "Hello world!"
mux.HandleFunc("/proofs/membership", serverErrorHandler())

_, err := client.Membership([]byte(event), 0)
assert.Error(t, err)
}

func TestIncremental(t *testing.T) {
tearDown := setup()
defer tearDown()

start := uint64(2)
end := uint64(8)
Expand All @@ -160,42 +157,47 @@ func TestIncremental(t *testing.T) {
AuditPath: visitor.AuditPath{"0|0": []uint8{0x0}},
}

resultJSON, _ := json.Marshal(fakeResult)
mux.HandleFunc("/proofs/incremental", okHandler(resultJSON))
inputJSON, _ := json.Marshal(fakeResult)

serverURL, tearDown := setupServer(inputJSON)
defer tearDown()
client := setupClient([]string{serverURL})

result, err := client.Incremental(start, end)
assert.NoError(t, err)
assert.Equal(t, fakeResult, result, "The results should match")
assert.Equal(t, fakeResult, result, "The inputs should match")
}

func TestIncrementalWithServerFailure(t *testing.T) {
tearDown := setup()
serverURL, tearDown := setupServer(nil)
defer tearDown()

mux.HandleFunc("/proofs/incremental", serverErrorHandler())
client := setupClient([]string{serverURL})

_, err := client.Incremental(uint64(2), uint64(8))
assert.Error(t, err)

}

// TODO implement a test to verify proofs using fake hash function

func okHandler(result []byte) func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
func defaultHandler(input []byte) func(http.ResponseWriter, *http.Request) {
statusOK := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
out := new(bytes.Buffer)
_ = json.Compact(out, result)
_ = json.Compact(out, input)
_, _ = w.Write(out.Bytes())
}
}

func serverErrorHandler() func(http.ResponseWriter, *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
statusInternalServerError := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusInternalServerError)
}

if input == nil {
return statusInternalServerError
} else {
return statusOK
}
}

func infoHandler(serverURL string) func(http.ResponseWriter, *http.Request) {
Expand Down
Loading

0 comments on commit e8a4a1c

Please sign in to comment.