diff --git a/go.mod b/go.mod index ff29c61..7b363e0 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,7 @@ go 1.12 require ( github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a // indirect github.com/criteo/haproxy-spoe-go v0.0.0-20190925130734-97891c13d324 + github.com/d4l3k/messagediff v1.2.1 // indirect github.com/docker/go-units v0.4.0 // indirect github.com/facebookgo/freeport v0.0.0-20150612182905-d4adf43b75b9 github.com/go-openapi/analysis v0.19.0 // indirect @@ -25,5 +26,6 @@ require ( golang.org/x/net v0.0.0-20190607181551-461777fb6f67 // indirect golang.org/x/sys v0.0.0-20190528012530-adf421d2caf4 // indirect golang.org/x/text v0.3.2 // indirect + gopkg.in/d4l3k/messagediff.v1 v1.2.1 gopkg.in/mcuadros/go-syslog.v2 v2.2.1 ) diff --git a/go.sum b/go.sum index 3d70b2c..0fb868e 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,8 @@ github.com/coredns/coredns v1.1.2 h1:bAFHrSsBeTeRG5W3Nf2su3lUGw7Npw2UKeCJm/3A638 github.com/coredns/coredns v1.1.2/go.mod h1:zASH/MVDgR6XZTbxvOnsZfffS+31vg6Ackf/wo1+AM0= github.com/criteo/haproxy-spoe-go v0.0.0-20190925130734-97891c13d324 h1:EG4AakHHowlW2TkSX6URMubHsmRjd0HWl4LtI4pD7WA= github.com/criteo/haproxy-spoe-go v0.0.0-20190925130734-97891c13d324/go.mod h1:3h7I0HgdYy7SIlcSLEUVLpFTfHA0V4qK6QsQEKNLRkI= +github.com/d4l3k/messagediff v1.2.1 h1:ZcAIMYsUg0EAp9X+tt8/enBE/Q8Yd5kzPynLyKptt9U= +github.com/d4l3k/messagediff v1.2.1/go.mod h1:Oozbb1TVXFac9FtSIxHBMnBCq2qeH/2KkEQxENCrlLo= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -467,6 +469,8 @@ gopkg.in/asn1-ber.v1 v1.0.0-20181015200546-f715ec2f112d/go.mod h1:cuepJuh7vyXfUy gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/d4l3k/messagediff.v1 v1.2.1 h1:70AthpjunwzUiarMHyED52mj9UwtAnE89l1Gmrt3EU0= +gopkg.in/d4l3k/messagediff.v1 v1.2.1/go.mod h1:EUzikiKadqXWcD1AzJLagx0j/BeeWGtn++04Xniyg44= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= diff --git a/haproxy/haproxy.go b/haproxy/haproxy.go index 36c136c..cdda754 100644 --- a/haproxy/haproxy.go +++ b/haproxy/haproxy.go @@ -23,14 +23,15 @@ type HAProxy struct { opts Options dataplaneClient *dataplane.Dataplane consulClient *api.Client - cfgC chan consul.Config - currentCfg *consul.Config - oldState state.State + cfgC chan consul.Config + + currentConsulConfig *consul.Config + currentHAProxyState state.State haConfig *haConfig - Ready chan (struct{}) + Ready chan struct{} } func New(consulClient *api.Client, cfg chan consul.Config, opts Options) *HAProxy { @@ -49,34 +50,7 @@ func New(consulClient *api.Client, cfg chan consul.Config, opts Options) *HAProx } func (h *HAProxy) Run(sd *lib.Shutdown) error { - init := false - statsStarted := false - for { - select { - case c := <-h.cfgC: - if !init { - err := h.start(sd) - if err != nil { - return err - } - init = true - close(h.Ready) - } - err := h.handleChange(c) - if err != nil { - log.Error(err) - } - if !statsStarted { - err = h.startStats() - if err != nil { - log.Error(err) - } - statsStarted = true - } - case <-sd.Stop: - return nil - } - } + return h.watch(sd) } func (h *HAProxy) start(sd *lib.Shutdown) error { @@ -114,6 +88,11 @@ func (h *HAProxy) start(sd *lib.Shutdown) error { } h.dataplaneClient = dpc + err = h.startStats() + if err != nil { + log.Error(err) + } + return nil } @@ -138,7 +117,7 @@ func (h *HAProxy) startLogger() error { func (h *HAProxy) startSPOA() error { spoeAgent := spoe.New(NewSPOEHandler(h.consulClient, func() consul.Config { - return *h.currentCfg + return *h.currentConsulConfig }).Handler) lis, err := net.Listen("unix", h.haConfig.SPOESock) @@ -173,8 +152,8 @@ func (h *HAProxy) startStats() error { reg := func() { err = h.consulClient.Agent().ServiceRegister(&api.AgentServiceRegistration{ - ID: fmt.Sprintf("%s-connect-stats", h.currentCfg.ServiceID), - Name: fmt.Sprintf("%s-connect-stats", h.currentCfg.ServiceName), + ID: fmt.Sprintf("%s-connect-stats", h.currentConsulConfig.ServiceID), + Name: fmt.Sprintf("%s-connect-stats", h.currentConsulConfig.ServiceName), Port: port, Checks: api.AgentServiceChecks{ &api.AgentServiceCheck{ @@ -198,7 +177,7 @@ func (h *HAProxy) startStats() error { }() go (&Stats{ dpapi: h.dataplaneClient, - service: h.currentCfg.ServiceName, + service: h.currentConsulConfig.ServiceName, }).Run() go func() { http.Handle("/metrics", promhttp.Handler()) diff --git a/haproxy/state.go b/haproxy/state.go index fad7bb8..841e6ea 100644 --- a/haproxy/state.go +++ b/haproxy/state.go @@ -1,43 +1,139 @@ package haproxy import ( + "sync/atomic" + "time" + "github.com/criteo/haproxy-consul-connect/consul" "github.com/criteo/haproxy-consul-connect/haproxy/state" + "github.com/criteo/haproxy-consul-connect/lib" log "github.com/sirupsen/logrus" + "gopkg.in/d4l3k/messagediff.v1" ) -func (h *HAProxy) handleChange(cfg consul.Config) error { - log.Info("handling new configuration") - - newState, err := state.Generate(state.Options{ - EnableIntentions: h.opts.EnableIntentions, - LogRequests: h.opts.LogRequests, - LogSocket: h.haConfig.LogsSock, - SPOEConfigPath: h.haConfig.SPOE, - SPOESocket: h.haConfig.SPOESock, - }, h.haConfig, h.oldState, cfg) - if err != nil { - return err - } +func (h *HAProxy) watch(sd *lib.Shutdown) error { + throttle := time.Tick(50 * time.Millisecond) + currentState := state.State{} + nextState := &atomic.Value{} + next := make(chan struct{}, 1) + dirty := false - tx := h.dataplaneClient.Tnx() + go func() { + for c := range h.cfgC { + select { + case <-sd.Stop: + return + default: + } - log.Debugf("applying new state: %+v", newState) + log.Info("received consul config update") + nextState.Store(c) + h.currentConsulConfig = &c + select { + case next <- struct{}{}: + default: + } + } + }() - err = state.Apply(tx, h.oldState, newState) - if err != nil { - return err - } + go func() { + for range time.Tick(5 * time.Minute) { + select { + case <-sd.Stop: + return + default: + } + + dirty = true + } + }() - err = tx.Commit() - if err != nil { - return err + retry := func() { + time.Sleep(3 * time.Second) + select { + case next <- struct{}{}: + default: + } } - h.oldState = newState - h.currentCfg = &cfg + started := false + for { + for { + select { + case <-sd.Stop: + return nil + case <-next: + } + + <-throttle + + log.Info("handling new configuration") + if !started { + err := h.start(sd) + if err != nil { + return err + } + started = true + close(h.Ready) + } + + if dirty { + log.Info("refreshing haproxy state") + fromHa, err := state.FromHAProxy(h.dataplaneClient) + if err != nil { + log.Errorf("error retrieving haproxy conf: %s", err) + retry() + continue + } + diff, equal := messagediff.PrettyDiff(currentState, fromHa) + if !equal { + log.Errorf("diff found between expected state and haproxy state: %s", diff) + } + currentState = fromHa + dirty = false + } + + newConsulCfg := nextState.Load().(consul.Config) - log.Info("state successfully applied") + newState, err := state.Generate(state.Options{ + EnableIntentions: h.opts.EnableIntentions, + LogRequests: h.opts.LogRequests, + LogSocket: h.haConfig.LogsSock, + SPOEConfigPath: h.haConfig.SPOE, + SPOESocket: h.haConfig.SPOESock, + }, h.haConfig, currentState, newConsulCfg) + if err != nil { + log.Error(err) + retry() + continue + } - return nil + if currentState.Equal(newState) { + log.Info("no change to apply to haproxy") + continue + } + + tx := h.dataplaneClient.Tnx() + + log.Debugf("applying new state: %+v", newState) + + err = state.Apply(tx, currentState, newState) + if err != nil { + log.Error(err) + retry() + continue + } + + err = tx.Commit() + if err != nil { + log.Error(err) + retry() + continue + } + + currentState = newState + + log.Info("state applied") + } + } } diff --git a/haproxy/state/state.go b/haproxy/state/state.go index f357739..c60ae6d 100644 --- a/haproxy/state/state.go +++ b/haproxy/state/state.go @@ -1,6 +1,10 @@ package state -import "github.com/haproxytech/models" +import ( + "reflect" + + "github.com/haproxytech/models" +) type FrontendFilter struct { Filter models.Filter @@ -25,6 +29,10 @@ type State struct { Backends []Backend } +func (s State) Equal(o State) bool { + return reflect.DeepEqual(s, o) +} + func (s State) findBackend(name string) (Backend, bool) { for _, b := range s.Backends { if b.Backend.Name == name {