Skip to content

Commit

Permalink
exponential backoff implementation (#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
arriven authored Mar 27, 2022
1 parent 19aee58 commit aea2c5e
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 40 deletions.
18 changes: 3 additions & 15 deletions src/jobs/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,23 +117,11 @@ func Get(t string) Job {
// BasicJobConfig comment for linter
type BasicJobConfig struct {
IntervalMs int `mapstructure:"interval_ms,omitempty"`
Count int `mapstructure:"count,omitempty"`

iter int
utils.Counter
*utils.BackoffConfig
}

// Next comment for linter
func (c *BasicJobConfig) Next(ctx context.Context) bool {
select {
case <-ctx.Done():
return false
case <-time.After(time.Duration(c.IntervalMs) * time.Millisecond):
if c.Count <= 0 {
return true
}

c.iter++

return c.iter <= c.Count
}
return utils.Sleep(ctx, time.Duration(c.IntervalMs)*time.Millisecond) && c.Counter.Next()
}
8 changes: 3 additions & 5 deletions src/jobs/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package jobs

import (
"context"
"errors"
"fmt"
"log"
"time"
Expand Down Expand Up @@ -130,6 +129,8 @@ func fastHTTPJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo
return nil, err
}

backoffController := utils.NewBackoffController(jobConfig.BackoffConfig)

client := http.NewClient(ctx, *clientConfig, logger)

trafficMonitor := metrics.Default.NewWriter(metrics.Traffic, uuid.New().String())
Expand All @@ -156,11 +157,8 @@ func fastHTTPJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalCo
trafficMonitor.Add(uint64(dataSize))

if err := sendFastHTTPRequest(client, req, nil); err != nil {
if errors.Is(err, fasthttp.ErrHostClientRedirectToDifferentScheme) {
return nil, err
}

logger.Debug("error sending request", zap.Error(err), zap.Any("args", args))
backoffController.Handle(ctx, err)
} else {
processedTrafficMonitor.Add(uint64(dataSize))
}
Expand Down
13 changes: 9 additions & 4 deletions src/jobs/rawnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ func tcpJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig,
return nil, err
}

backoffController := utils.NewBackoffController(jobConfig.BackoffConfig)

if globalConfig.ProxyURLs != "" {
jobConfig.proxyURLs = templates.ParseAndExecute(logger, globalConfig.ProxyURLs, ctx)
}
Expand All @@ -72,13 +74,14 @@ func tcpJob(ctx context.Context, logger *zap.Logger, globalConfig *GlobalConfig,
}

for jobConfig.Next(ctx) {
sendTCP(ctx, logger, jobConfig, trafficMonitor, processedTrafficMonitor)
err = sendTCP(ctx, logger, jobConfig, trafficMonitor, processedTrafficMonitor)
backoffController.Handle(ctx, err)
}

return nil, nil
}

func sendTCP(ctx context.Context, logger *zap.Logger, jobConfig *rawnetConfig, trafficMonitor, processedTrafficMonitor *metrics.Writer) {
func sendTCP(ctx context.Context, logger *zap.Logger, jobConfig *rawnetConfig, trafficMonitor, processedTrafficMonitor *metrics.Writer) error {
// track sending of SYN packet
trafficMonitor.Add(packetgen.TCPHeaderSize + packetgen.IPHeaderSize)

Expand All @@ -87,7 +90,7 @@ func sendTCP(ctx context.Context, logger *zap.Logger, jobConfig *rawnetConfig, t
logger.Debug("error connecting via tcp", zap.String("addr", jobConfig.addr), zap.Error(err))
metrics.IncRawnetTCP(jobConfig.addr, metrics.StatusFail)

return
return err
}

defer conn.Close()
Expand All @@ -107,12 +110,14 @@ func sendTCP(ctx context.Context, logger *zap.Logger, jobConfig *rawnetConfig, t
if err != nil {
metrics.IncRawnetTCP(jobConfig.addr, metrics.StatusFail)

return
return err
}

processedTrafficMonitor.Add(uint64(n))
metrics.IncRawnetTCP(jobConfig.addr, metrics.StatusSuccess)
}

return nil
}

func udpJob(ctx context.Context, logger *zap.Logger, _ *GlobalConfig, args Args) (data interface{}, err error) {
Expand Down
89 changes: 89 additions & 0 deletions src/utils/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package utils

import (
"context"
"time"
)

func Sleep(ctx context.Context, t time.Duration) bool {
select {
case <-time.After(t):
return true
case <-ctx.Done():
return false
}
}

type BackoffConfig struct {
Multiplier int `mapstructure:"backoff_multiplier"`
Limit int `mapstructure:"backoff_limit"`
Timeout time.Duration `mapstructure:"backoff_timeout"`
}

func DefaultBackoffConfig() BackoffConfig {
const (
defaultMultiplier = 10
defaultLimit = 6
)

return BackoffConfig{Multiplier: defaultMultiplier, Limit: defaultLimit, Timeout: time.Microsecond}
}

type BackoffController struct {
BackoffConfig
count int
}

func NewBackoffController(c *BackoffConfig) BackoffController {
if c != nil {
return BackoffController{BackoffConfig: *c}
}

return BackoffController{BackoffConfig: DefaultBackoffConfig()}
}

func (c BackoffController) getTimeout() time.Duration {
result := c.Timeout
for i := 0; i < c.count; i++ {
result *= time.Duration(c.Multiplier)
}

return result
}

func (c *BackoffController) increment() {
if c.count < c.Limit {
c.count++
}
}

func (c *BackoffController) reset() {
c.count = 0
}

func (c *BackoffController) Handle(ctx context.Context, err error) {
if err == nil {
c.reset()

return
}

c.increment()
Sleep(ctx, c.getTimeout())
}

type Counter struct {
Count int `mapstructure:"count,omitempty"`

iter int
}

func (c *Counter) Next() bool {
if c.Count <= 0 {
return true
}

c.iter++

return c.iter <= c.Count
}
33 changes: 17 additions & 16 deletions src/utils/countrychecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,29 +40,30 @@ func CheckCountryOrFail(cfg *CountryCheckerConfig) string {
func CheckCountry(countriesToAvoid []string, strictCountryCheck bool) (bool, string) {
const maxFetchRetries = 3

var country, ip string

for retries := 1; ; retries++ {
log.Printf("Checking IP address, attempt #%d", retries)
var (
country, ip string
err error
)

var err error
if country, ip, err = fetchLocationInfo(); err != nil {
if retries < maxFetchRetries {
time.Sleep(time.Second)
counter := Counter{Count: maxFetchRetries}
backoffController := NewBackoffController(nil)

continue
}
for counter.Next() {
log.Printf("Checking IP address, attempt #%d", counter.iter)

if strictCountryCheck {
log.Printf("Failed to check the country info in %d attempts while in strict mode", maxFetchRetries)
if country, ip, err = fetchLocationInfo(); err != nil {
backoffController.Handle(context.Background(), err)
}
}

return false, ""
}
if err != nil {
if strictCountryCheck {
log.Printf("Failed to check the country info in %d attempts while in strict mode", maxFetchRetries)

return true, ""
return false, ""
}

break
return true, ""
}

log.Printf("Current country: %s (%s)", country, ip)
Expand Down

0 comments on commit aea2c5e

Please sign in to comment.