Skip to content

Commit

Permalink
Api client rate limiting (#163)
Browse files Browse the repository at this point in the history
* add rate limiting to api

* add test for rate limiting + refactor

* introduce retries with exponential backoff

* add tests for backoff

* cleanup + add logging on retries

* resolve RateLimit naming conflict

* fix review comments, add configurable logging
  • Loading branch information
benjvi authored and elithrar committed Mar 28, 2018
1 parent f0cb847 commit e1f3c42
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 21 deletions.
119 changes: 99 additions & 20 deletions cloudflare.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package cloudflare

import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"log"
"math"
"net/http"
"strings"
"time"

"github.com/pkg/errors"
"golang.org/x/time/rate"
)

const apiURL = "https://api.cloudflare.com/client/v4"
Expand All @@ -30,6 +36,9 @@ type API struct {
headers http.Header
httpClient *http.Client
authType int
rateLimiter *rate.Limiter
retryPolicy RetryPolicy
logger Logger
}

// New creates a new Cloudflare v4 API client.
Expand All @@ -38,12 +47,21 @@ func New(key, email string, opts ...Option) (*API, error) {
return nil, errors.New(errEmptyCredentials)
}

silentLogger := log.New(ioutil.Discard, "", log.LstdFlags)

api := &API{
APIKey: key,
APIEmail: email,
BaseURL: apiURL,
headers: make(http.Header),
authType: AuthKeyEmail,
APIKey: key,
APIEmail: email,
BaseURL: apiURL,
headers: make(http.Header),
authType: AuthKeyEmail,
rateLimiter: rate.NewLimiter(rate.Limit(4), 1), // 4rps equates to default api limit (1200 req/5 min)
retryPolicy: RetryPolicy{
MaxRetries: 3,
MinRetryDelay: time.Duration(1) * time.Second,
MaxRetryDelay: time.Duration(30) * time.Second,
},
logger: silentLogger,
}

err := api.parseOptions(opts...)
Expand Down Expand Up @@ -87,26 +105,73 @@ func (api *API) makeRequest(method, uri string, params interface{}) ([]byte, err

func (api *API) makeRequestWithAuthType(method, uri string, params interface{}, authType int) ([]byte, error) {
// Replace nil with a JSON object if needed
var reqBody io.Reader
var jsonBody []byte
var err error
if params != nil {
json, err := json.Marshal(params)
jsonBody, err = json.Marshal(params)
if err != nil {
return nil, errors.Wrap(err, "error marshalling params to JSON")
}
reqBody = bytes.NewReader(json)
} else {
reqBody = nil
jsonBody = nil
}

resp, err := api.request(method, uri, reqBody, authType)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var resp *http.Response
var respErr error
var reqBody io.Reader
var respBody []byte
for i := 0; i <= api.retryPolicy.MaxRetries; i++ {
if jsonBody != nil {
reqBody = bytes.NewReader(jsonBody)
}

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, errors.Wrap(err, "could not read response body")
if i > 0 {
// expect the backoff introduced here on errored requests to dominate the effect of rate limiting
// dont need a random component here as the rate limiter should do something similar
// nb time duration could truncate an arbitrary float. Since our inputs are all ints, we should be ok
sleepDuration := time.Duration(math.Pow(2, float64(i-1)) * float64(api.retryPolicy.MinRetryDelay))

if sleepDuration > api.retryPolicy.MaxRetryDelay {
sleepDuration = api.retryPolicy.MaxRetryDelay
}
// useful to do some simple logging here, maybe introduce levels later
api.logger.Printf("Sleeping %s before retry attempt number %d for request %s %s", sleepDuration.String(), i, method, uri)
time.Sleep(sleepDuration)
}
api.rateLimiter.Wait(context.TODO())
if err != nil {
return nil, errors.Wrap(err, "Error caused by request rate limiting")
}
resp, respErr = api.request(method, uri, reqBody, authType)

// retry if the server is rate limiting us or if it failed
// assumes server operations are rolled back on failure
if respErr != nil || resp.StatusCode == http.StatusTooManyRequests || resp.StatusCode >= 500 {
// if we got a valid http response, try to read body so we can reuse the connection
// see https://golang.org/pkg/net/http/#Client.Do
if respErr == nil {
respBody, err = ioutil.ReadAll(resp.Body)
resp.Body.Close()

respErr = errors.Wrap(err, "could not read response body")

api.logger.Printf("Request: %s %s got an error response %d: %s\n", method, uri, resp.StatusCode,
strings.Replace(strings.Replace(string(respBody), "\n", "", -1), "\t", "", -1))
} else {
api.logger.Printf("Error performing request: %s %s : %s \n", method, uri, respErr.Error())
}
continue
} else {
respBody, err = ioutil.ReadAll(resp.Body)
defer resp.Body.Close()
if err != nil {
return nil, errors.Wrap(err, "could not read response body")
}
break
}
}
if respErr != nil {
return nil, respErr
}

switch {
Expand All @@ -124,13 +189,13 @@ func (api *API) makeRequestWithAuthType(method, uri string, params interface{},
return nil, errors.Errorf("HTTP status %d: service failure", resp.StatusCode)
default:
var s string
if body != nil {
s = string(body)
if respBody != nil {
s = string(respBody)
}
return nil, errors.Errorf("HTTP status %d: content %q", resp.StatusCode, s)
}

return body, nil
return respBody, nil
}

// request makes a HTTP request to the given API endpoint, returning the raw
Expand Down Expand Up @@ -237,3 +302,17 @@ type PaginationOptions struct {
Page int `json:"page,omitempty"`
PerPage int `json:"per_page,omitempty"`
}

// RetryPolicy specifies number of retries and min/max retry delays
// This config is used when the client exponentially backs off after errored requests
type RetryPolicy struct {
MaxRetries int
MinRetryDelay time.Duration
MaxRetryDelay time.Duration
}

// Logger defines the interface this library needs to use logging
// This is a subset of the methods implemented in the log package
type Logger interface {
Printf(format string, v ...interface{})
}
101 changes: 101 additions & 0 deletions cloudflare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ func setup(opts ...Option) {
mux = http.NewServeMux()
server = httptest.NewServer(mux)

// disable rate limits and retries in testing - prepended so any provided value overrides this
opts = append([]Option{UsingRateLimit(100000), UsingRetryPolicy(0, 0, 0)}, opts...)

// Cloudflare client configured to use test server
client, _ = New("deadbeef", "[email protected]", opts...)
client.BaseURL = server.URL
Expand Down Expand Up @@ -100,3 +103,101 @@ func TestClient_Auth(t *testing.T) {

assert.NoError(t, err)
}

func TestClient_RetryCanSucceedAfterErrors(t *testing.T) {
setup(UsingRetryPolicy(2, 0, 1))
defer teardown()

requestsReceived := 0
// could test any function, using ListLoadBalancerPools
handler := func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, r.Method, "GET", "Expected method 'GET', got %s", r.Method)
w.Header().Set("content-type", "application/json")

// we are doing three *retries*
if requestsReceived == 0 {
// return error causing client to retry
w.WriteHeader(500)
fmt.Fprint(w, `{
"success": false,
"errors": [ "server created some error"],
"messages": [],
"result": []
}`)
} else if requestsReceived == 1 {
// return error causing client to retry
w.WriteHeader(429)
fmt.Fprint(w, `{
"success": false,
"errors": [ "this is a rate limiting error"],
"messages": [],
"result": []
}`)
} else {
// return success response
fmt.Fprint(w, `{
"success": true,
"errors": [],
"messages": [],
"result": [
{
"id": "17b5962d775c646f3f9725cbc7a53df4",
"created_on": "2014-01-01T05:20:00.12345Z",
"modified_on": "2014-02-01T05:20:00.12345Z",
"description": "Primary data center - Provider XYZ",
"name": "primary-dc-1",
"enabled": true,
"monitor": "f1aba936b94213e5b8dca0c0dbf1f9cc",
"origins": [
{
"name": "app-server-1",
"address": "0.0.0.0",
"enabled": true
}
],
"notification_email": "[email protected]"
}
],
"result_info": {
"page": 1,
"per_page": 20,
"count": 1,
"total_count": 2000
}
}`)
}
requestsReceived++

}

mux.HandleFunc("/user/load_balancers/pools", handler)

_, err := client.ListLoadBalancerPools()
assert.NoError(t, err)
}

func TestClient_RetryReturnsPersistentErrorResponse(t *testing.T) {
setup(UsingRetryPolicy(2, 0, 1))
defer teardown()

// could test any function, using ListLoadBalancerPools
handler := func(w http.ResponseWriter, r *http.Request) {
assert.Equal(t, r.Method, "GET", "Expected method 'GET', got %s", r.Method)
w.Header().Set("content-type", "application/json")

// return error causing client to retry
w.WriteHeader(500)
fmt.Fprint(w, `{
"success": false,
"errors": [ "server created some error"],
"messages": [],
"result": []
}`)

}

mux.HandleFunc("/user/load_balancers/pools", handler)

_, err := client.ListLoadBalancerPools()
assert.Error(t, err)
}
44 changes: 43 additions & 1 deletion options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package cloudflare

import "net/http"
import (
"net/http"

"time"

"golang.org/x/time/rate"
)

// Option is a functional option for configuring the API client.
type Option func(*API) error
Expand Down Expand Up @@ -31,6 +37,42 @@ func UsingOrganization(orgID string) Option {
}
}

// UsingRateLimit applies a non-default rate limit to client API requests
// If not specified the default of 4rps will be applied
func UsingRateLimit(rps float64) Option {
return func(api *API) error {
// because ratelimiter doesnt do any windowing
// setting burst makes it difficult to enforce a fixed rate
// so setting it equal to 1 this effectively disables bursting
// this doesn't check for sensible values, ultimately the api will enforce that the value is ok
api.rateLimiter = rate.NewLimiter(rate.Limit(rps), 1)
return nil
}
}

// UsingRetryPolicy applies a non-default number of retries and min/max retry delays
// This will be used when the client exponentially backs off after errored requests
func UsingRetryPolicy(maxRetries int, minRetryDelaySecs int, maxRetryDelaySecs int) Option {
// seconds is very granular for a minimum delay - but this is only in case of failure
return func(api *API) error {
api.retryPolicy = RetryPolicy{
MaxRetries: maxRetries,
MinRetryDelay: time.Duration(minRetryDelaySecs) * time.Second,
MaxRetryDelay: time.Duration(maxRetryDelaySecs) * time.Second,
}
return nil
}
}

// UsingLogger can be set if you want to get log output from this API instance
// By default no log output is emitted
func UsingLogger(logger Logger) Option {
return func(api *API) error {
api.logger = logger
return nil
}
}

// parseOptions parses the supplied options functions and returns a configured
// *API instance.
func (api *API) parseOptions(opts ...Option) error {
Expand Down

0 comments on commit e1f3c42

Please sign in to comment.