Skip to content

Commit

Permalink
Merge pull request #627 from containous/add-long-job-exponential-backoff
Browse files Browse the repository at this point in the history
Add long job exponential backoff
  • Loading branch information
vdemeester authored Aug 19, 2016
2 parents fc19ab2 + 97ddfcb commit 95e8f0a
Show file tree
Hide file tree
Showing 10 changed files with 68 additions and 27 deletions.
2 changes: 1 addition & 1 deletion middlewares/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewLogger(file string) *Logger {
if len(file) > 0 {
fi, err := os.OpenFile(file, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatal("Error opening file", err)
log.Error("Error opening file", err)
}
return &Logger{fi}
}
Expand Down
5 changes: 3 additions & 2 deletions provider/consul_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/hashicorp/consul/api"
)

Expand Down Expand Up @@ -320,9 +321,9 @@ func (provider *ConsulCatalog) Provide(configurationChan chan<- types.ConfigMess
worker := func() error {
return provider.watch(configurationChan, stop)
}
err := backoff.RetryNotify(worker, backoff.NewExponentialBackOff(), notify)
err := utils.RetryNotifyJob(worker, backoff.NewExponentialBackOff(), notify)
if err != nil {
log.Fatalf("Cannot connect to consul server %+v", err)
log.Errorf("Cannot connect to consul server %+v", err)
}
})

Expand Down
5 changes: 3 additions & 2 deletions provider/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/containous/traefik/version"
"github.com/docker/engine-api/client"
dockertypes "github.com/docker/engine-api/types"
Expand Down Expand Up @@ -139,9 +140,9 @@ func (provider *Docker) Provide(configurationChan chan<- types.ConfigMessage, po
notify := func(err error, time time.Duration) {
log.Errorf("Docker connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil {
log.Fatalf("Cannot connect to docker server %+v", err)
log.Errorf("Cannot connect to docker server %+v", err)
}
})

Expand Down
15 changes: 5 additions & 10 deletions provider/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/containous/traefik/provider/k8s"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"io"
"github.com/containous/traefik/utils"
"io/ioutil"
"os"
"reflect"
Expand Down Expand Up @@ -104,7 +104,7 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
for {
stopWatch := make(chan bool, 5)
defer close(stopWatch)
log.Debugf("Using lable selector: %s", provider.LabelSelector)
log.Debugf("Using label selector: '%s'", provider.LabelSelector)
eventsChan, errEventsChan, err := k8sClient.WatchAll(provider.LabelSelector, stopWatch)
if err != nil {
log.Errorf("Error watching kubernetes events: %v", err)
Expand All @@ -116,18 +116,13 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
return nil
}
}
Watch:
for {
select {
case <-stop:
stopWatch <- true
return nil
case err, ok := <-errEventsChan:
case err, _ := <-errEventsChan:
stopWatch <- true
if ok && strings.Contains(err.Error(), io.EOF.Error()) {
// edge case, kubernetes long-polling disconnection
break Watch
}
return err
case event := <-eventsChan:
log.Debugf("Received event from kubernetes %+v", event)
Expand All @@ -152,9 +147,9 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
notify := func(err error, time time.Duration) {
log.Errorf("Kubernetes connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backOff, notify)
err := utils.RetryNotifyJob(operation, backOff, notify)
if err != nil {
log.Fatalf("Cannot connect to Kubernetes server %+v", err)
log.Errorf("Cannot connect to Kubernetes server %+v", err)
}
})

Expand Down
5 changes: 3 additions & 2 deletions provider/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/docker/libkv"
"github.com/docker/libkv/store"
)
Expand Down Expand Up @@ -75,7 +76,7 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil {
return fmt.Errorf("Cannot connect to KV server: %v", err)
}
Expand Down Expand Up @@ -105,7 +106,7 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
notify := func(err error, time time.Duration) {
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil {
return fmt.Errorf("Cannot connect to KV server: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions provider/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/gambol99/go-marathon"
"net/http"
"time"
Expand Down Expand Up @@ -108,9 +109,9 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
notify := func(err error, time time.Duration) {
log.Errorf("Marathon connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil {
log.Fatalf("Cannot connect to Marathon server %+v", err)
log.Errorf("Cannot connect to Marathon server %+v", err)
}
return nil
}
Expand Down
9 changes: 5 additions & 4 deletions provider/mesos.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/cenkalti/backoff"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
"github.com/containous/traefik/utils"
"github.com/mesos/mesos-go/detector"
_ "github.com/mesos/mesos-go/detector/zoo" // Registers the ZK detector
"github.com/mesosphere/mesos-dns/detect"
Expand Down Expand Up @@ -110,9 +111,9 @@ func (provider *Mesos) Provide(configurationChan chan<- types.ConfigMessage, poo
notify := func(err error, time time.Duration) {
log.Errorf("mesos connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
err := utils.RetryNotifyJob(operation, backoff.NewExponentialBackOff(), notify)
if err != nil {
log.Fatalf("Cannot connect to mesos server %+v", err)
log.Errorf("Cannot connect to mesos server %+v", err)
}
return nil
}
Expand Down Expand Up @@ -393,9 +394,9 @@ func detectMasters(zk string, masters []string) <-chan []string {
if zk != "" {
log.Debugf("Starting master detector for ZK ", zk)
if md, err := detector.New(zk); err != nil {
log.Fatalf("failed to create master detector: %v", err)
log.Errorf("failed to create master detector: %v", err)
} else if err := md.Detect(detect.NewMasters(masters, changed)); err != nil {
log.Fatalf("failed to initialize master detector: %v", err)
log.Errorf("failed to initialize master detector: %v", err)
}
} else {
changed <- masters
Expand Down
4 changes: 2 additions & 2 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ func (server *Server) prepareServer(entryPointName string, router *middlewares.H
negroni.UseHandler(router)
tlsConfig, err := server.createTLSConfig(entryPointName, entryPoint.TLS, router)
if err != nil {
log.Fatalf("Error creating TLS config %s", err)
log.Errorf("Error creating TLS config %s", err)
return nil, err
}

Expand All @@ -431,7 +431,7 @@ func (server *Server) prepareServer(entryPointName string, router *middlewares.H
TLSConfig: tlsConfig,
}, tlsConfig)
if err != nil {
log.Fatalf("Error hijacking server %s", err)
log.Errorf("Error hijacking server %s", err)
return nil, err
}
return gracefulServer, nil
Expand Down
4 changes: 2 additions & 2 deletions traefik.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func run(traefikConfiguration *TraefikConfiguration) {
// logging
level, err := log.ParseLevel(strings.ToLower(globalConfiguration.LogLevel))
if err != nil {
log.Fatal("Error getting level", err)
log.Error("Error getting level", err)
}
log.SetLevel(level)
if len(globalConfiguration.TraefikLogsFile) > 0 {
Expand All @@ -214,7 +214,7 @@ func run(traefikConfiguration *TraefikConfiguration) {
}
}()
if err != nil {
log.Fatal("Error opening file", err)
log.Error("Error opening file", err)
} else {
log.SetOutput(fi)
log.SetFormatter(&log.TextFormatter{DisableColors: true, FullTimestamp: true, DisableSorting: true})
Expand Down
41 changes: 41 additions & 0 deletions utils/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package utils

import (
"github.com/cenkalti/backoff"
"time"
)

const (
minLongJobInterval = 30 * time.Second
)

// RetryNotifyJob calls notify function with the error and wait duration
// for each failed attempt before sleep.
func RetryNotifyJob(operation backoff.Operation, b backoff.BackOff, notify backoff.Notify) error {
var err error
var next time.Duration

b.Reset()
for {
before := time.Now()
if err = operation(); err == nil {
return nil
}
elapsed := time.Since(before)

// If long job, we reset the backoff
if elapsed >= minLongJobInterval {
b.Reset()
}

if next = b.NextBackOff(); next == backoff.Stop {
return err
}

if notify != nil {
notify(err, next)
}

time.Sleep(next)
}
}

0 comments on commit 95e8f0a

Please sign in to comment.