Skip to content

Commit

Permalink
Add exponentialBackoff selecting endpoints via RoundRobin.
Browse files Browse the repository at this point in the history
Add tests given wrong server endpoints.
Create new Topology struct.
  • Loading branch information
Jose Luis Lucas authored and iknite committed Feb 21, 2019
1 parent 0b41e5b commit c7b019b
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 44 deletions.
83 changes: 43 additions & 40 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
type HTTPClient struct {
conf *Config
*http.Client
topology Topology
}

// NewHTTPClient will return a new instance of HTTPClient.
Expand All @@ -64,16 +65,19 @@ func NewHTTPClient(conf Config) *HTTPClient {
TLSHandshakeTimeout: 5 * time.Second,
},
},
Topology{},
}

conf.ClusterLeader = conf.Endpoints[0]
// Initial topology assignment
client.topology.Leader = conf.Endpoints[0]
client.topology.Endpoints = conf.Endpoints

info, err := client.getClusterInfo()
if err != nil {
log.Errorf("Failed to get raft cluster info: %v", err)
return nil
}
client.updateConf(info)
client.updateTopology(info)

return client
}
Expand All @@ -83,11 +87,25 @@ func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, erro
var retries uint

for {
url, err := url.Parse(c.topology.Leader + path)
if err != nil {
return nil, err //panic(err)
}

req, err := http.NewRequest(method, fmt.Sprintf("%s", url), bytes.NewBuffer(data))
if err != nil {
return nil, err //panic(err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.conf.APIKey)

resp, err := c.Do(req)
if err != nil {
if retries == 5 {
return nil, err
}
c.topology.Leader = c.topology.Endpoints[rand.Intn(len(c.topology.Endpoints))]
retries = retries + 1
delay := time.Duration(10 << retries * time.Millisecond)
time.Sleep(delay)
Expand All @@ -100,19 +118,7 @@ func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, erro
func (c HTTPClient) getClusterInfo() (map[string]interface{}, error) {
info := make(map[string]interface{})

req, err := http.NewRequest("GET", c.conf.ClusterLeader+"/info/shards", bytes.NewBuffer([]byte{}))
if err != nil {
return info, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.conf.APIKey)
resp, err := c.Do(req)
if err != nil {
return info, err
}
defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
body, err := c.doReq("GET", "/info/shards", []byte{})
if err != nil {
return info, err
}
Expand All @@ -122,10 +128,10 @@ func (c HTTPClient) getClusterInfo() (map[string]interface{}, error) {
return info, err
}

return info, err
return info, nil
}

func (c *HTTPClient) updateConf(info map[string]interface{}) {
func (c *HTTPClient) updateTopology(info map[string]interface{}) {

clusterMeta := info["meta"].(map[string]interface{})
leaderID := info["leaderID"].(string)
Expand All @@ -140,7 +146,7 @@ func (c *HTTPClient) updateConf(info map[string]interface{}) {
leaderAddr = scheme + addr.(string)
}
}
c.conf.ClusterLeader = leaderAddr
c.topology.Leader = leaderAddr

for _, nodeMeta := range clusterMeta {
for k, address := range nodeMeta.(map[string]interface{}) {
Expand All @@ -150,24 +156,12 @@ func (c *HTTPClient) updateConf(info map[string]interface{}) {
}
}
}
c.conf.Endpoints = endpoints
c.topology.Endpoints = endpoints
}

func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
url, err := url.Parse(c.conf.ClusterLeader + path)
if err != nil {
panic(err)
}
func (c *HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {

req, err := http.NewRequest(method, fmt.Sprintf("%s", url), bytes.NewBuffer(data))
if err != nil {
panic(err)
}

req.Header.Set("Content-Type", "application/json")
req.Header.Set("Api-Key", c.conf.APIKey)

resp, err := c.exponentialBackoff(req)
resp, err := c.exponentialBackoff(method, path, data)
if err != nil {
return nil, err
}
Expand All @@ -186,14 +180,23 @@ func (c HTTPClient) doReq(method, path string, data []byte) ([]byte, error) {
return bodyBytes, nil
}

// Ping will do a request to the server
func (c HTTPClient) Ping() error {
_, err := c.doReq("GET", "/health-check", nil)
if err != nil {
return err
}
func (c *HTTPClient) exponentialBackoff(req *http.Request) (*http.Response, error) {

var retries uint

return nil
for {
resp, err := c.Do(req)
if err != nil {
if retries == 5 {
return nil, err
}
retries = retries + 1
delay := time.Duration(10 << retries * time.Millisecond)
time.Sleep(delay)
continue
}
return resp, err
}
}

// Add will do a request to the server with a post data to store a new event.
Expand Down
13 changes: 9 additions & 4 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
package client

type Config struct {
// Endpoint [host:port] to operate.
// Must be the QED cluster leader.
ClusterLeader string

// Endpoints [host:port,host:port,...] to ask for QED cluster-topology.
Endpoints []string

Expand All @@ -40,6 +36,15 @@ type Config struct {
HandshakeTimeoutSeconds int
}

type Topology struct {
// Topology endpoints [host:port,host:port,...a]
Endpoints []string

// Endpoint [host:port] to operate.
// Must be the QED cluster leader.
Leader string
}

func DefaultConfig() *Config {
return &Config{
Endpoints: []string{"127.0.0.1:8800"},
Expand Down
59 changes: 59 additions & 0 deletions tests/e2e/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,62 @@ func TestCluster(t *testing.T) {

})
}

func TestClusterBadEndpoint(t *testing.T) {
before0, after0 := setupServer(0, "", false, t)
before1, after1 := setupServer(1, "", false, t)

serversHttpAddr := "badendpoint,http://127.0.0.1:8080"

scenario, let := scope.Scope(t, merge(before0, before1), merge(after0, after1))

scenario("Add one event through cli and verify it", func() {
let("Add event", func(t *testing.T) {
cmd := exec.Command("go",
"run",
"./../../main.go",
fmt.Sprintf("--apikey=%s", APIKey),
"client",
fmt.Sprintf("--endpoints=%s", serversHttpAddr),
"add",
"--key='test event'",
"--value=2",
"--log=info",
)

_, err := cmd.CombinedOutput()

assert.NoErrorf(t, err, "Subprocess must not exit with status 1: %v", *cmd)
})

})
}

func TestSingleBadEndpoint(t *testing.T) {
before0, after0 := setupServer(0, "", false, t)

serversHttpAddr := "badendpoint"

scenario, let := scope.Scope(t, merge(before0), merge(after0))

scenario("Add one event through cli and verify it", func() {
let("Add event", func(t *testing.T) {
cmd := exec.Command("go",
"run",
"./../../main.go",
fmt.Sprintf("--apikey=%s", APIKey),
"client",
fmt.Sprintf("--endpoints=%s", serversHttpAddr),
"add",
"--key='test event'",
"--value=2",
"--log=info",
)

_, err := cmd.CombinedOutput()

assert.Errorf(t, err, "Subprocess must exit with status 1: %v", *cmd)
})

})
}

0 comments on commit c7b019b

Please sign in to comment.