Skip to content

Commit

Permalink
Handle retries and state refresh when dirty
Browse files Browse the repository at this point in the history
  • Loading branch information
Thibault Gilles committed Oct 4, 2019
1 parent 9594138 commit 556c75e
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 63 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.12
require (
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect
github.com/criteo/haproxy-spoe-go v0.0.0-20190925130734-97891c13d324
github.com/d4l3k/messagediff v1.2.1 // indirect
github.com/docker/go-units v0.4.0 // indirect
github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9
github.com/go-openapi/analysis v0.19.0 // indirect
Expand All @@ -25,5 +26,6 @@ require (
golang.org/x/net v0.0.0-20190607181551-461777fb6f67 // indirect
golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect
golang.org/x/text v0.3.2 // indirect
gopkg.in/d4l3k/messagediff.v1 v1.2.1
gopkg.in/mcuadros/go-syslog.v2 v2.2.1
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ github.com/coredns/coredns v1.1.2 h1:bAFHrSsBeTeRG5W3Nf2su3lUGw7Npw2UKeCJm/3A638
github.com/coredns/coredns v1.1.2/go.mod h1:zASH/MVDgR6XZTbxvOnsZfffS+31vg6Ackf/wo1+AM0=
github.com/criteo/haproxy-spoe-go v0.0.0-20190925130734-97891c13d324 h1:EG4AakHHowlW2TkSX6URMubHsmRjd0HWl4LtI4pD7WA=
github.com/criteo/haproxy-spoe-go v0.0.0-20190925130734-97891c13d324/go.mod h1:3h7I0HgdYy7SIlcSLEUVLpFTfHA0V4qK6QsQEKNLRkI=
github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U=
github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -467,6 +469,8 @@ gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUy
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/d4l3k/messagediff.v1 v1.2.1 h1:70AthpjunwzUiarMHyED52mj9UwtAnE89l1Gmrt3EU0=
gopkg.in/d4l3k/messagediff.v1 v1.2.1/go.mod h1:EUzikiKadqXWcD1AzJLagx0j/BeeWGtn++04Xniyg44=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
Expand Down
51 changes: 15 additions & 36 deletions haproxy/haproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ type HAProxy struct {
opts Options
dataplaneClient *dataplane.Dataplane
consulClient *api.Client
cfgC chan consul.Config
currentCfg *consul.Config

oldState state.State
cfgC chan consul.Config

currentConsulConfig *consul.Config
currentHAProxyState state.State

haConfig *haConfig

Ready chan (struct{})
Ready chan struct{}
}

func New(consulClient *api.Client, cfg chan consul.Config, opts Options) *HAProxy {
Expand All @@ -49,34 +50,7 @@ func New(consulClient *api.Client, cfg chan consul.Config, opts Options) *HAProx
}

func (h *HAProxy) Run(sd *lib.Shutdown) error {
init := false
statsStarted := false
for {
select {
case c := <-h.cfgC:
if !init {
err := h.start(sd)
if err != nil {
return err
}
init = true
close(h.Ready)
}
err := h.handleChange(c)
if err != nil {
log.Error(err)
}
if !statsStarted {
err = h.startStats()
if err != nil {
log.Error(err)
}
statsStarted = true
}
case <-sd.Stop:
return nil
}
}
return h.watch(sd)
}

func (h *HAProxy) start(sd *lib.Shutdown) error {
Expand Down Expand Up @@ -114,6 +88,11 @@ func (h *HAProxy) start(sd *lib.Shutdown) error {
}
h.dataplaneClient = dpc

err = h.startStats()
if err != nil {
log.Error(err)
}

return nil
}

Expand All @@ -138,7 +117,7 @@ func (h *HAProxy) startLogger() error {

func (h *HAProxy) startSPOA() error {
spoeAgent := spoe.New(NewSPOEHandler(h.consulClient, func() consul.Config {
return *h.currentCfg
return *h.currentConsulConfig
}).Handler)

lis, err := net.Listen("unix", h.haConfig.SPOESock)
Expand Down Expand Up @@ -173,8 +152,8 @@ func (h *HAProxy) startStats() error {

reg := func() {
err = h.consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{
ID: fmt.Sprintf("%s-connect-stats", h.currentCfg.ServiceID),
Name: fmt.Sprintf("%s-connect-stats", h.currentCfg.ServiceName),
ID: fmt.Sprintf("%s-connect-stats", h.currentConsulConfig.ServiceID),
Name: fmt.Sprintf("%s-connect-stats", h.currentConsulConfig.ServiceName),
Port: port,
Checks: api.AgentServiceChecks{
&api.AgentServiceCheck{
Expand All @@ -198,7 +177,7 @@ func (h *HAProxy) startStats() error {
}()
go (&Stats{
dpapi: h.dataplaneClient,
service: h.currentCfg.ServiceName,
service: h.currentConsulConfig.ServiceName,
}).Run()
go func() {
http.Handle("/metrics", promhttp.Handler())
Expand Down
148 changes: 122 additions & 26 deletions haproxy/state.go
Original file line number Diff line number Diff line change
@@ -1,43 +1,139 @@
package haproxy

import (
"sync/atomic"
"time"

"github.com/criteo/haproxy-consul-connect/consul"
"github.com/criteo/haproxy-consul-connect/haproxy/state"
"github.com/criteo/haproxy-consul-connect/lib"
log "github.com/sirupsen/logrus"
"gopkg.in/d4l3k/messagediff.v1"
)

func (h *HAProxy) handleChange(cfg consul.Config) error {
log.Info("handling new configuration")

newState, err := state.Generate(state.Options{
EnableIntentions: h.opts.EnableIntentions,
LogRequests: h.opts.LogRequests,
LogSocket: h.haConfig.LogsSock,
SPOEConfigPath: h.haConfig.SPOE,
SPOESocket: h.haConfig.SPOESock,
}, h.haConfig, h.oldState, cfg)
if err != nil {
return err
}
func (h *HAProxy) watch(sd *lib.Shutdown) error {
throttle := time.Tick(50 * time.Millisecond)
currentState := state.State{}
nextState := &atomic.Value{}
next := make(chan struct{}, 1)
dirty := false

tx := h.dataplaneClient.Tnx()
go func() {
for c := range h.cfgC {
select {
case <-sd.Stop:
return
default:
}

log.Debugf("applying new state: %+v", newState)
log.Info("received consul config update")
nextState.Store(c)
h.currentConsulConfig = &c
select {
case next <- struct{}{}:
default:
}
}
}()

err = state.Apply(tx, h.oldState, newState)
if err != nil {
return err
}
go func() {
for range time.Tick(5 * time.Minute) {
select {
case <-sd.Stop:
return
default:
}

dirty = true
}
}()

err = tx.Commit()
if err != nil {
return err
retry := func() {
time.Sleep(3 * time.Second)
select {
case next <- struct{}{}:
default:
}
}

h.oldState = newState
h.currentCfg = &cfg
started := false
for {
for {
select {
case <-sd.Stop:
return nil
case <-next:
}

<-throttle

log.Info("handling new configuration")
if !started {
err := h.start(sd)
if err != nil {
return err
}
started = true
close(h.Ready)
}

if dirty {
log.Info("refreshing haproxy state")
fromHa, err := state.FromHAProxy(h.dataplaneClient)
if err != nil {
log.Errorf("error retrieving haproxy conf: %s", err)
retry()
continue
}
diff, equal := messagediff.PrettyDiff(currentState, fromHa)
if !equal {
log.Errorf("diff found between expected state and haproxy state: %s", diff)
}
currentState = fromHa
dirty = false
}

newConsulCfg := nextState.Load().(consul.Config)

log.Info("state successfully applied")
newState, err := state.Generate(state.Options{
EnableIntentions: h.opts.EnableIntentions,
LogRequests: h.opts.LogRequests,
LogSocket: h.haConfig.LogsSock,
SPOEConfigPath: h.haConfig.SPOE,
SPOESocket: h.haConfig.SPOESock,
}, h.haConfig, currentState, newConsulCfg)
if err != nil {
log.Error(err)
retry()
continue
}

return nil
if currentState.Equal(newState) {
log.Info("no change to apply to haproxy")
continue
}

tx := h.dataplaneClient.Tnx()

log.Debugf("applying new state: %+v", newState)

err = state.Apply(tx, currentState, newState)
if err != nil {
log.Error(err)
retry()
continue
}

err = tx.Commit()
if err != nil {
log.Error(err)
retry()
continue
}

currentState = newState

log.Info("state applied")
}
}
}
10 changes: 9 additions & 1 deletion haproxy/state/state.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package state

import "github.com/haproxytech/models"
import (
"reflect"

"github.com/haproxytech/models"
)

type FrontendFilter struct {
Filter models.Filter
Expand All @@ -25,6 +29,10 @@ type State struct {
Backends []Backend
}

func (s State) Equal(o State) bool {
return reflect.DeepEqual(s, o)
}

func (s State) findBackend(name string) (Backend, bool) {
for _, b := range s.Backends {
if b.Backend.Name == name {
Expand Down

0 comments on commit 556c75e

Please sign in to comment.