Skip to content

Commit

Permalink
Performance & Metrics changes
Browse files Browse the repository at this point in the history
Changes that Bloomberg have been using in production to support 3,000+
nodes with on 100 ESM's.

 - Adds metrics for health checks and performance
 - Adds support for > 64 ESM Instances
 - Improvements to allow a checks to run ever 1s safely
 - Improvements to prevent spurious node status updates
 - Improvements to allow disabling node coordinate updates
 - Fix an issue where check updates can fail to propagate to ESM Followers
 - Extra Logging
 - Exposed new config options
 - Adds ability to skip node coordinate updates if they are small (reducing spurious updates)
 - Updated to go mod for vendored dependencies
  • Loading branch information
mikeyyuen authored and lornasong committed Apr 30, 2020
1 parent 07c8a75 commit 950800c
Show file tree
Hide file tree
Showing 11 changed files with 628 additions and 174 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ _testmain.go
*.test
.proc
*.conf
consul-esm

# IDE files
*.iml
Expand Down Expand Up @@ -55,4 +56,4 @@ Icon
.AppleDesktop
Network Trash Folder
Temporary Items
.apdisk
.apdisk
67 changes: 59 additions & 8 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ import (
"sync"
"time"

"github.com/armon/go-metrics"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-uuid"
)

Expand All @@ -28,8 +30,24 @@ var (
// deregisterTime is the time the TTL check must be in a failed state for
// the ESM service in consul to be deregistered.
deregisterTime = 30 * time.Minute

// Specifies a minimum interval that check's can run on
minimumInterval = 1 * time.Second

// Specifies the maximum transaction size for kv store ops
maximumTransactionSize = 64
)

type lastKnownStatus struct {
status string
time time.Time
}

func (s lastKnownStatus) isExpired(ttl time.Duration) bool {
statusAge := time.Now().Sub(s.time)
return statusAge >= ttl
}

type Agent struct {
config *Config
client *api.Client
Expand All @@ -44,7 +62,11 @@ type Agent struct {
inflightLock sync.Mutex

// Custom func to hook into for testing.
watchedNodeFunc func(map[string]bool, []*api.Node)
watchedNodeFunc func(map[string]bool, []*api.Node)
knownNodeStatuses map[string]lastKnownStatus
knwonNodeStatusesLock sync.Mutex

memSink *metrics.InmemSink
}

func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
Expand All @@ -64,13 +86,20 @@ func NewAgent(config *Config, logger *log.Logger) (*Agent, error) {
}
}

memSink, err := lib.InitTelemetry(config.Telemetry)
if err != nil {
return nil, err
}

agent := Agent{
config: config,
client: client,
id: id,
logger: logger,
shutdownCh: make(chan struct{}),
inflightPings: make(map[string]struct{}),
config: config,
client: client,
id: id,
logger: logger,
shutdownCh: make(chan struct{}),
inflightPings: make(map[string]struct{}),
knownNodeStatuses: make(map[string]lastKnownStatus),
memSink: memSink,
}

logger.Printf("[INFO] Connecting to Consul on %s...", clientConf.Address)
Expand Down Expand Up @@ -308,6 +337,8 @@ func (a *Agent) watchNodeList() {
continue
}

a.logger.Printf("[INFO] Fetched %d nodes from catalog", len(nodes))

var pingList []*api.Node
for _, node := range nodes {
if pingNodes[node.Node] {
Expand Down Expand Up @@ -344,7 +375,7 @@ func (a *Agent) watchHealthChecks(nodeListCh chan map[string]bool) {

// Start a check runner to track and run the health checks we're responsible for and call
// UpdateChecks when we get an update from watchHealthChecks.
a.checkRunner = NewCheckRunner(a.logger, a.client, a.config.CheckUpdateInterval)
a.checkRunner = NewCheckRunner(a.logger, a.client, a.config.CheckUpdateInterval, minimumInterval)
go a.checkRunner.reapServices(a.shutdownCh)
defer a.checkRunner.Stop()

Expand Down Expand Up @@ -387,3 +418,23 @@ func (a *Agent) watchHealthChecks(nodeListCh chan map[string]bool) {
}
}
}

// Check last visible node status.
// Returns true, if status is changed since last update and false otherwise.
func (a *Agent) shouldUpdateNodeStatus(node string, newStatus string) bool {
a.knwonNodeStatusesLock.Lock()
defer a.knwonNodeStatusesLock.Unlock()
ttl := a.config.NodeHealthRefreshInterval
lastStatus, exists := a.knownNodeStatuses[node]
if !exists || lastStatus.isExpired(ttl) {
return true
}
return newStatus != lastStatus.status
}

// Update last visible node status.
func (a *Agent) updateLastKnownNodeStatus(node string, newStatus string) {
a.knwonNodeStatusesLock.Lock()
defer a.knwonNodeStatusesLock.Unlock()
a.knownNodeStatuses[node] = lastKnownStatus{newStatus, time.Now()}
}
Loading

0 comments on commit 950800c

Please sign in to comment.