Skip to content

Commit

Permalink
consul: refactor service monitor
Browse files Browse the repository at this point in the history
Refactor the set of functions which watch the consul state
and generate the route commands into a set of objects to make
them testable and extendable.
  • Loading branch information
magiconair committed Nov 15, 2018
1 parent 44d6dec commit 9096f2a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 22 deletions.
3 changes: 2 additions & 1 deletion registry/consul/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,9 @@ func (b *be) WatchServices() chan string {
log.Printf("[INFO] consul: Using dynamic routes")
log.Printf("[INFO] consul: Using tag prefix %q", b.cfg.TagPrefix)

m := NewServiceMonitor(b.c, b.cfg, b.dc)
svc := make(chan string)
go watchServices(b.c, b.cfg, svc)
go m.Watch(svc)
return svc
}

Expand Down
58 changes: 37 additions & 21 deletions registry/consul/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,52 @@ import (
"github.com/hashicorp/consul/api"
)

// watchServices monitors the consul health checks and creates a new configuration
// on every change.
func watchServices(client *api.Client, config *config.Consul, svcConfig chan string) {
var lastIndex uint64
var strict bool = strings.EqualFold("all", config.ChecksRequired)
// ServiceMonitor generates fabio configurations from consul state.
type ServiceMonitor struct {
client *api.Client
config *config.Consul
dc string
strict bool
}

func NewServiceMonitor(client *api.Client, config *config.Consul, dc string) *ServiceMonitor {
return &ServiceMonitor{
client: client,
config: config,
dc: dc,
strict: config.ChecksRequired == "all",
}
}

// Watch monitors the consul health checks and sends a new
// configuration to the updates channnel on every change.
func (w *ServiceMonitor) Watch(updates chan string) {
var lastIndex uint64
for {
q := &api.QueryOptions{RequireConsistent: true, WaitIndex: lastIndex}
checks, meta, err := client.Health().State("any", q)
checks, meta, err := w.client.Health().State("any", q)
if err != nil {
log.Printf("[WARN] consul: Error fetching health state. %v", err)
time.Sleep(time.Second)
continue
}

log.Printf("[DEBUG] consul: Health changed to #%d", meta.LastIndex)
svcConfig <- servicesConfig(client, passingServices(checks, config.ServiceStatus, strict), config.TagPrefix)

// determine which services have passing health checks
passing := passingServices(checks, w.config.ServiceStatus, w.strict)

// build the config for the passing services
updates <- w.makeConfig(passing)

// remember the last state and wait for the next change
lastIndex = meta.LastIndex
}
}

// servicesConfig determines which service instances have passing health checks
// makeCconfig determines which service instances have passing health checks
// and then finds the ones which have tags with the right prefix to build the config from.
func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix string) string {
func (w *ServiceMonitor) makeConfig(checks []*api.HealthCheck) string {
// map service name to list of service passing for which the health check is ok
m := map[string]map[string]bool{}
for _, check := range checks {
Expand All @@ -54,7 +76,7 @@ func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix str

var config []string
for name, passing := range m {
cfg := serviceConfig(client, name, passing, tagPrefix)
cfg := w.serviceConfig(name, passing)
config = append(config, cfg...)
}

Expand All @@ -65,26 +87,20 @@ func servicesConfig(client *api.Client, checks []*api.HealthCheck, tagPrefix str
}

// serviceConfig constructs the config for all good instances of a single service.
func serviceConfig(client *api.Client, name string, passing map[string]bool, tagPrefix string) (config []string) {
func (w *ServiceMonitor) serviceConfig(name string, passing map[string]bool) (config []string) {
if name == "" || len(passing) == 0 {
return nil
}

dc, err := datacenter(client)
if err != nil {
log.Printf("[WARN] consul: Error getting datacenter. %s", err)
return nil
}

q := &api.QueryOptions{RequireConsistent: true}
svcs, _, err := client.Catalog().Service(name, "", q)
svcs, _, err := w.client.Catalog().Service(name, "", q)
if err != nil {
log.Printf("[WARN] consul: Error getting catalog service %s. %v", name, err)
return nil
}

env := map[string]string{
"DC": dc,
"DC": w.dc,
}

for _, svc := range svcs {
Expand All @@ -97,14 +113,14 @@ func serviceConfig(client *api.Client, name string, passing map[string]bool, tag
// get all tags which do not have the tag prefix
var svctags []string
for _, tag := range svc.ServiceTags {
if !strings.HasPrefix(tag, tagPrefix) {
if !strings.HasPrefix(tag, w.config.TagPrefix) {
svctags = append(svctags, tag)
}
}

// generate route commands
for _, tag := range svc.ServiceTags {
if route, opts, ok := parseURLPrefixTag(tag, tagPrefix, env); ok {
if route, opts, ok := parseURLPrefixTag(tag, w.config.TagPrefix, env); ok {
name, addr, port := svc.ServiceName, svc.ServiceAddress, svc.ServicePort

// use consul node address if service address is not set
Expand Down

0 comments on commit 9096f2a

Please sign in to comment.