-
Notifications
You must be signed in to change notification settings - Fork 61
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: Improve retry strategy by reseting default hosts after some time
We recently discovered that the implementation of the retry strategy for the client was lacking an important feature: the ability to reset hosts marked as down after some time. Because of that, an host that was marked down could never be retried without instantiating a new client. This commit is a complete rewrite of the retry strategy implementation to address this issue. For simplicity, we decoupled the retry strategy implementation from the transport layer, responsible for issuing the HTTP calls. This new implementation was written in a concurrent-safe way, meaning that concurrent calls should not result in UB regarding the state of the retry strategy's hosts. As the `context` package is now used to be able to effeciently timeout individual requets, we now have to deprecate support for version Go 1.6 and before (we were previously supporting Go 1.6 and higher) as this package was only introduced in the standard library of Go 1.7.
- Loading branch information
Showing
5 changed files
with
432 additions
and
313 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
package call | ||
|
||
type Kind int | ||
|
||
const ( | ||
Read Kind = iota | ||
Write | ||
Analytics | ||
) | ||
|
||
func IsRead(k Kind) bool { return k == Read } | ||
func IsWrite(k Kind) bool { return k == Write } | ||
func IsAnalytics(k Kind) bool { return k == Analytics } | ||
func IsReadWrite(k Kind) bool { return IsRead(k) || IsWrite(k) } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,34 @@ | ||
package algoliasearch | ||
|
||
import "errors" | ||
import ( | ||
"errors" | ||
"net" | ||
) | ||
|
||
var ( | ||
NoMoreHitsErr error = errors.New("No more hits") | ||
NoMoreSynonymsErr error = errors.New("No more synonyms") | ||
NoMoreRulesErr error = errors.New("No more rules") | ||
NoMoreHitsErr error = errors.New("No more hits") | ||
NoMoreSynonymsErr error = errors.New("No more synonyms") | ||
NoMoreRulesErr error = errors.New("No more rules") | ||
ExhaustionOfTryableHostsErr error = errors.New("All hosts have been contacted unsuccessfully") | ||
) | ||
|
||
// NetError is used internally to differente regular error from errors | ||
// following the net.Error interface in order to propagate them with a custom | ||
// message. | ||
type NetError struct { | ||
msg string | ||
isTimeout bool | ||
isTemporary bool | ||
} | ||
|
||
func NewNetError(err net.Error, msg string) *NetError { | ||
return &NetError{ | ||
msg: msg, | ||
isTimeout: err.Timeout(), | ||
isTemporary: err.Temporary(), | ||
} | ||
} | ||
|
||
func (e *NetError) Error() string { return e.msg } | ||
func (e *NetError) Timeout() bool { return e.isTimeout } | ||
func (e *NetError) Temporary() bool { return e.isTemporary } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,252 @@ | ||
package algoliasearch | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"net" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/algolia/algoliasearch-client-go/algoliasearch/call" | ||
) | ||
|
||
func init() { | ||
rand.Seed(int64(time.Now().Nanosecond())) | ||
} | ||
|
||
type Outcome int | ||
|
||
const ( | ||
DefaultReadTimeout = 5 * time.Second | ||
DefaultWriteTimeout = 30 * time.Second | ||
DefaultAnalyticsTimeout = 30 * time.Second | ||
|
||
Success Outcome = iota | ||
Failure | ||
Retry | ||
) | ||
|
||
type TryableHost interface { | ||
Host() string | ||
Timeout() time.Duration | ||
} | ||
|
||
type tryableHost struct { | ||
host string | ||
timeout time.Duration | ||
} | ||
|
||
func (h *tryableHost) Host() string { return h.host } | ||
func (h *tryableHost) Timeout() time.Duration { return h.timeout } | ||
func (h *tryableHost) String() string { return fmt.Sprintf("tryableHost{%s,%s}", h.host, h.timeout) } | ||
|
||
type RetryStrategy interface { | ||
// GetTryableHosts returns the slice of host to try to send the request to. | ||
GetTryableHosts(k call.Kind) []TryableHost | ||
|
||
// Decide returns an Outcome defining if the call have succeded, or failed | ||
// or to be retried. | ||
Decide(h TryableHost, code int, err error) Outcome | ||
|
||
// SetTimeouts updates the internal timeouts for read, write (i.e. | ||
// indexing) and analytics calls. Negative values are simply ignored, | ||
// leaving the original timeouts unchanged. | ||
SetTimeouts(read, write, analytics time.Duration) | ||
} | ||
|
||
type retryStrategy struct { | ||
sync.RWMutex | ||
hosts []*statefulHost | ||
readTimeout time.Duration | ||
writeTimeout time.Duration | ||
analyticsTimeout time.Duration | ||
} | ||
|
||
type statefulHost struct { | ||
host string | ||
isDown bool | ||
retryCount int | ||
lastUpdate time.Time | ||
accept func(k call.Kind) bool | ||
} | ||
|
||
func (h *statefulHost) String() string { | ||
return fmt.Sprintf( | ||
"statefulHost{host:%s, isDown: %t, retryCount:%d}", | ||
h.host, | ||
h.isDown, | ||
h.retryCount, | ||
) | ||
} | ||
|
||
func (h *statefulHost) reset() { | ||
h.isDown = false | ||
h.lastUpdate = time.Now() | ||
h.retryCount = 0 | ||
} | ||
|
||
func NewRetryStrategy(appID string, providedHosts []string) *retryStrategy { | ||
var allHosts []*statefulHost | ||
now := time.Now() | ||
|
||
if providedHosts != nil && len(providedHosts) > 0 { | ||
for _, h := range providedHosts { | ||
allHosts = append(allHosts, &statefulHost{host: h, lastUpdate: now, accept: call.IsReadWrite}) | ||
} | ||
} else { | ||
allHosts = append(allHosts, &statefulHost{host: appID + "-dsn.algolia.net", lastUpdate: now, accept: call.IsRead}) | ||
allHosts = append(allHosts, &statefulHost{host: appID + ".algolia.net", lastUpdate: now, accept: call.IsWrite}) | ||
allHosts = append(allHosts, shuffle( | ||
[]*statefulHost{ | ||
&statefulHost{host: appID + "-1.algolianet.com", lastUpdate: now, accept: call.IsReadWrite}, | ||
&statefulHost{host: appID + "-2.algolianet.com", lastUpdate: now, accept: call.IsReadWrite}, | ||
&statefulHost{host: appID + "-3.algolianet.com", lastUpdate: now, accept: call.IsReadWrite}, | ||
}, | ||
)...) | ||
} | ||
allHosts = append(allHosts, &statefulHost{host: "analytics.algolia.com", lastUpdate: now, accept: call.IsAnalytics}) | ||
|
||
return &retryStrategy{ | ||
hosts: allHosts, | ||
readTimeout: DefaultReadTimeout, | ||
writeTimeout: DefaultWriteTimeout, | ||
analyticsTimeout: DefaultAnalyticsTimeout, | ||
} | ||
} | ||
|
||
func (s *retryStrategy) GetTryableHosts(k call.Kind) []TryableHost { | ||
s.resetExpiredHosts() | ||
|
||
s.Lock() | ||
defer s.Unlock() | ||
|
||
var baseTimeout time.Duration | ||
switch k { | ||
case call.Read: | ||
baseTimeout = s.readTimeout | ||
case call.Write: | ||
baseTimeout = s.writeTimeout | ||
case call.Analytics: | ||
baseTimeout = s.analyticsTimeout | ||
default: | ||
return nil | ||
} | ||
|
||
var hosts []TryableHost | ||
for _, h := range s.hosts { | ||
if !h.isDown && h.accept(k) { | ||
hosts = append(hosts, &tryableHost{h.host, baseTimeout * time.Duration(h.retryCount+1)}) | ||
} | ||
} | ||
if len(hosts) > 0 { | ||
return hosts | ||
} | ||
for _, h := range s.hosts { | ||
if h.accept(k) { | ||
h.reset() | ||
hosts = append(hosts, &tryableHost{h.host, baseTimeout}) | ||
} | ||
} | ||
return hosts | ||
} | ||
|
||
func (s *retryStrategy) Decide(h TryableHost, code int, err error) Outcome { | ||
if err == nil && is2xx(code) { | ||
s.markUp(h.Host()) | ||
return Success | ||
} | ||
|
||
if isTimeoutError(err) { | ||
s.markTimeouted(h.Host()) | ||
return Retry | ||
} | ||
|
||
if !(isZero(code) || is4xx(code) || is2xx(code)) || isNetworkError(err) { | ||
s.markDown(h.Host()) | ||
return Retry | ||
} | ||
|
||
return Failure | ||
} | ||
|
||
func (s *retryStrategy) SetTimeouts(read, write, analytics time.Duration) { | ||
s.Lock() | ||
defer s.Unlock() | ||
|
||
if read > 0 { | ||
s.readTimeout = read | ||
} | ||
if write > 0 { | ||
s.writeTimeout = write | ||
} | ||
if analytics > 0 { | ||
s.analyticsTimeout = analytics | ||
} | ||
} | ||
|
||
func (s *retryStrategy) markUp(host string) { s.update(host, false, false) } | ||
func (s *retryStrategy) markDown(host string) { s.update(host, true, false) } | ||
func (s *retryStrategy) markTimeouted(host string) { s.update(host, false, true) } | ||
func (s *retryStrategy) update(host string, isDown, isTimeout bool) { | ||
s.Lock() | ||
defer s.Unlock() | ||
|
||
for _, h := range s.hosts { | ||
if h.host == host { | ||
h.isDown = isDown | ||
h.lastUpdate = time.Now() | ||
if isTimeout { | ||
h.retryCount++ | ||
} else { | ||
h.retryCount = 0 | ||
} | ||
return | ||
} | ||
} | ||
} | ||
|
||
func (s *retryStrategy) resetExpiredHosts() { | ||
s.Lock() | ||
defer s.Unlock() | ||
|
||
for _, h := range s.hosts { | ||
if h.isDown && time.Since(h.lastUpdate) > 5*time.Minute { | ||
h.reset() | ||
} | ||
} | ||
} | ||
|
||
func shuffle(hosts []*statefulHost) []*statefulHost { | ||
if hosts == nil { | ||
return nil | ||
} | ||
shuffled := make([]*statefulHost, len(hosts)) | ||
for i, v := range rand.Perm(len(hosts)) { | ||
shuffled[i] = hosts[v] | ||
} | ||
return shuffled | ||
} | ||
|
||
func isNetworkError(err error) bool { | ||
if err == nil { | ||
return false | ||
} | ||
_, ok := err.(net.Error) | ||
// We need to ensure that the error is a net.Error but not a | ||
// context.DeadlineExceeded error (which is actually a net.Error), because | ||
// we do not want to consider context.DeadlineExceeded as an error. | ||
return ok && !isTimeoutError(err) | ||
} | ||
|
||
func isTimeoutError(err error) bool { | ||
if err == nil { | ||
return false | ||
} | ||
return strings.Contains(err.Error(), context.DeadlineExceeded.Error()) | ||
} | ||
|
||
func isZero(code int) bool { return code == 0 } | ||
func is2xx(code int) bool { return 200 <= code && code < 300 } | ||
func is4xx(code int) bool { return 400 <= code && code < 500 } |
Oops, something went wrong.