Skip to content

Commit

Permalink
[consul] Support multiple instances (#2948)
Browse files Browse the repository at this point in the history
Also adds support of yaml-configured tags on each instance to
distinguish data from different instances running on the same Agent
  • Loading branch information
olivielpeau authored Oct 21, 2016
1 parent 4bd2a8a commit 18262e2
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 44 deletions.
68 changes: 39 additions & 29 deletions checks.d/consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# project
from checks import AgentCheck
from utils.containers import hash_mutable

# 3p
import requests
Expand Down Expand Up @@ -39,6 +40,13 @@ def ceili(v):
return int(ceil(v))


class ConsulCheckInstanceState(object):
def __init__(self):
self.local_config = None
self.last_config_fetch_time = None
self.last_known_leader = None


class ConsulCheck(AgentCheck):
CONSUL_CHECK = 'consul.up'
HEALTH_CHECK = 'consul.check'
Expand All @@ -59,12 +67,8 @@ class ConsulCheck(AgentCheck):

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
if instances is not None and len(instances) > 1:
raise Exception("Consul check only supports one configured instance.")

self._local_config = None
self._last_config_fetch_time = None
self._last_known_leader = None
self._instance_states = defaultdict(lambda: ConsulCheckInstanceState())

def consul_request(self, instance, endpoint):
url = urljoin(instance.get('url'), endpoint)
Expand All @@ -90,42 +94,42 @@ def consul_request(self, instance, endpoint):
return resp.json()

### Consul Config Accessors
def _get_local_config(self, instance):
if not self._local_config or datetime.now() - self._last_config_fetch_time > timedelta(seconds=self.MAX_CONFIG_TTL):
self._local_config = self.consul_request(instance, '/v1/agent/self')
self._last_config_fetch_time = datetime.now()
def _get_local_config(self, instance, instance_state):
if not instance_state.local_config or datetime.now() - instance_state.last_config_fetch_time > timedelta(seconds=self.MAX_CONFIG_TTL):
instance_state.local_config = self.consul_request(instance, '/v1/agent/self')
instance_state.last_config_fetch_time = datetime.now()

return self._local_config
return instance_state.local_config

def _get_cluster_leader(self, instance):
return self.consul_request(instance, '/v1/status/leader')

def _get_agent_url(self, instance):
def _get_agent_url(self, instance, instance_state):
self.log.debug("Starting _get_agent_url")
local_config = self._get_local_config(instance)
local_config = self._get_local_config(instance, instance_state)
agent_addr = local_config.get('Config', {}).get('AdvertiseAddr')
agent_port = local_config.get('Config', {}).get('Ports', {}).get('Server')
agent_url = "{0}:{1}".format(agent_addr, agent_port)
self.log.debug("Agent url is %s" % agent_url)
return agent_url

def _get_agent_datacenter(self, instance):
local_config = self._get_local_config(instance)
def _get_agent_datacenter(self, instance, instance_state):
local_config = self._get_local_config(instance, instance_state)
agent_dc = local_config.get('Config', {}).get('Datacenter')
return agent_dc

### Consul Leader Checks
def _is_instance_leader(self, instance):
def _is_instance_leader(self, instance, instance_state):
try:
agent_url = self._get_agent_url(instance)
leader = self._last_known_leader or self._get_cluster_leader(instance)
agent_url = self._get_agent_url(instance, instance_state)
leader = instance_state.last_known_leader or self._get_cluster_leader(instance)
self.log.debug("Consul agent lives at %s . Consul Leader lives at %s" % (agent_url,leader))
return agent_url == leader

except Exception:
return False

def _check_for_leader_change(self, instance):
def _check_for_leader_change(self, instance, instance_state):
perform_new_leader_checks = instance.get('new_leader_checks',
self.init_config.get('new_leader_checks', False))
perform_self_leader_check = instance.get('self_leader_check',
Expand All @@ -148,20 +152,20 @@ def _check_for_leader_change(self, instance):
self.log.warn('Consul Leader information is not available!')
return

if not self._last_known_leader:
if not instance_state.last_known_leader:
# We have no state preserved, store some and return
self._last_known_leader = leader
instance_state.last_known_leader = leader
return

agent = self._get_agent_url(instance)
agent_dc = self._get_agent_datacenter(instance)
agent = self._get_agent_url(instance, instance_state)
agent_dc = self._get_agent_datacenter(instance, instance_state)

if leader != self._last_known_leader:
if leader != instance_state.last_known_leader:
# There was a leadership change
if perform_new_leader_checks or (perform_self_leader_check and agent == leader):
# We either emit all leadership changes or emit when we become the leader and that just happened
self.log.info(('Leader change from {0} to {1}. Sending new leader event').format(
self._last_known_leader, leader))
instance_state.last_known_leader, leader))

self.event({
"timestamp": int(datetime.now().strftime("%s")),
Expand All @@ -173,12 +177,12 @@ def _check_for_leader_change(self, instance):
leader,
agent_dc
),
"tags": ["prev_consul_leader:{0}".format(self._last_known_leader),
"tags": ["prev_consul_leader:{0}".format(instance_state.last_known_leader),
"curr_consul_leader:{0}".format(leader),
"consul_datacenter:{0}".format(agent_dc)]
})

self._last_known_leader = leader
instance_state.last_known_leader = leader

### Consul Catalog Accessors
def get_peers_in_cluster(self, instance):
Expand Down Expand Up @@ -208,16 +212,22 @@ def _cull_services_list(self, services, service_whitelist):
return services

def check(self, instance):
self._check_for_leader_change(instance)
# Instance state is mutable, any changes to it will be reflected in self._instance_states
instance_state = self._instance_states[hash_mutable(instance)]

self._check_for_leader_change(instance, instance_state)

peers = self.get_peers_in_cluster(instance)
main_tags = []
agent_dc = self._get_agent_datacenter(instance)
agent_dc = self._get_agent_datacenter(instance, instance_state)

if agent_dc is not None:
main_tags.append('consul_datacenter:{0}'.format(agent_dc))

if not self._is_instance_leader(instance):
for tag in instance.get('tags', []):
main_tags.append(tag)

if not self._is_instance_leader(instance, instance_state):
self.gauge("consul.peers", len(peers), tags=main_tags + ["mode:follower"])
self.log.debug("This consul agent is not the cluster leader." +
"Skipping service and catalog checks for this instance")
Expand Down
14 changes: 10 additions & 4 deletions conf.d/consul.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ instances:
# Whether to perform checks against the Consul service Catalog
catalog_checks: yes

# Whether to enable self leader checks. Each agent with this enabled will
# Whether to enable self leader checks. Each instance with this enabled will
# watch for itself to become the leader and will emit an event when that
# happens. It is safe/expected to enable this on all nodes in a consul
# cluster since only the new leader will emit the (single) event. This
# flag takes precedence over new_leader_checks.
self_leader_check: yes

# Whether to enable new leader checks from this agent
# Note: if this is set on multiple agents in the same cluster
# you will receive one event per leader change per agent. See
# Whether to enable new leader checks from this instance
# Note: if this is set on multiple instances/agents in the same cluster
# you will receive one event per leader change per instance. See
# self_leader_check for a more robust option.
new_leader_checks: yes

Expand All @@ -50,3 +50,9 @@ instances:
# - zookeeper
# - gunicorn
# - redis

# Additional tags to apply to the metrics, events and service checks
# You should always specify tags when multiple instances of the check run
# on the same agent.
# tags:
# - 'consul_server:server1'
23 changes: 12 additions & 11 deletions tests/checks/mock/test_consul.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random

from tests.checks.common import AgentCheckTest, load_check
from utils.containers import hash_mutable

MOCK_CONFIG = {
'init_config': {},
Expand Down Expand Up @@ -103,7 +104,7 @@ def mock_get_n_services_in_cluster(self, n):
dct[k] = []
return dct

def mock_get_local_config(self, instance):
def mock_get_local_config(self, instance, instance_state):
return {
"Config": {
"AdvertiseAddr": "10.0.2.15",
Expand Down Expand Up @@ -374,9 +375,6 @@ def _get_consul_mocks(self):
'_get_coord_nodes': self.mock_get_coord_nodes,
}

def test_bad_config(self):
self.assertRaises(Exception, self.run_check, MOCK_BAD_CONFIG)

def test_get_nodes_with_service(self):
self.run_check(MOCK_CONFIG, mocks=self._get_consul_mocks())
self.assertMetric('consul.catalog.nodes_up', value=1, tags=['consul_datacenter:dc1', 'consul_service_id:service-1'])
Expand Down Expand Up @@ -462,7 +460,8 @@ def test_cull_services_list(self):

def test_new_leader_event(self):
self.check = load_check(self.CHECK_NAME, MOCK_CONFIG_LEADER_CHECK, self.DEFAULT_AGENT_CONFIG)
self.check._last_known_leader = 'My Old Leader'
instance_hash = hash_mutable(MOCK_CONFIG_LEADER_CHECK['instances'][0])
self.check._instance_states[instance_hash].last_known_leader = 'My Old Leader'

mocks = self._get_consul_mocks()
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_B
Expand All @@ -477,7 +476,8 @@ def test_new_leader_event(self):

def test_self_leader_event(self):
self.check = load_check(self.CHECK_NAME, MOCK_CONFIG_SELF_LEADER_CHECK, self.DEFAULT_AGENT_CONFIG)
self.check._last_known_leader = 'My Old Leader'
instance_hash = hash_mutable(MOCK_CONFIG_SELF_LEADER_CHECK['instances'][0])
self.check._instance_states[instance_hash].last_known_leader = 'My Old Leader'

mocks = self._get_consul_mocks()

Expand All @@ -488,7 +488,7 @@ def test_self_leader_event(self):
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_A
self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.assertEqual(len(self.events), 1)
self.assertEqual(our_url, self.check._last_known_leader)
self.assertEqual(our_url, self.check._instance_states[instance_hash].last_known_leader)
event = self.events[0]
self.assertEqual(event['event_type'], 'consul.new_leader')
self.assertIn('prev_consul_leader:My Old Leader', event['tags'])
Expand All @@ -502,13 +502,13 @@ def test_self_leader_event(self):
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_B
self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.assertEqual(len(self.events), 0)
self.assertEqual(other_url, self.check._last_known_leader)
self.assertEqual(other_url, self.check._instance_states[instance_hash].last_known_leader)

# We regain the leadership
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_A
self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.assertEqual(len(self.events), 1)
self.assertEqual(our_url, self.check._last_known_leader)
self.assertEqual(our_url, self.check._instance_states[instance_hash].last_known_leader)
event = self.events[0]
self.assertEqual(event['event_type'], 'consul.new_leader')
self.assertIn('prev_consul_leader:%s' % other_url, event['tags'])
Expand All @@ -521,9 +521,10 @@ def test_network_latency_checks(self):
mocks = self._get_consul_mocks()

# We start out as the leader, and stay that way
self.check._last_known_leader = self.mock_get_cluster_leader_A(None)
instance_hash = hash_mutable(MOCK_CONFIG_NETWORK_LATENCY_CHECKS['instances'][0])
self.check._instance_states[instance_hash].last_known_leader = self.mock_get_cluster_leader_A(None)

self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.run_check(MOCK_CONFIG_NETWORK_LATENCY_CHECKS, mocks=mocks)

latency = [m for m in self.metrics if m[0].startswith('consul.net.')]
latency.sort()
Expand Down

0 comments on commit 18262e2

Please sign in to comment.