Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[consul] Support multiple instances #2948

Merged
merged 1 commit into from
Oct 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 39 additions & 29 deletions checks.d/consul.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

# project
from checks import AgentCheck
from utils.containers import hash_mutable

# 3p
import requests
Expand Down Expand Up @@ -39,6 +40,13 @@ def ceili(v):
return int(ceil(v))


class ConsulCheckInstanceState(object):
def __init__(self):
self.local_config = None
self.last_config_fetch_time = None
self.last_known_leader = None


class ConsulCheck(AgentCheck):
CONSUL_CHECK = 'consul.up'
HEALTH_CHECK = 'consul.check'
Expand All @@ -59,12 +67,8 @@ class ConsulCheck(AgentCheck):

def __init__(self, name, init_config, agentConfig, instances=None):
AgentCheck.__init__(self, name, init_config, agentConfig, instances)
if instances is not None and len(instances) > 1:
raise Exception("Consul check only supports one configured instance.")

self._local_config = None
self._last_config_fetch_time = None
self._last_known_leader = None
self._instance_states = defaultdict(lambda: ConsulCheckInstanceState())

def consul_request(self, instance, endpoint):
url = urljoin(instance.get('url'), endpoint)
Expand All @@ -90,42 +94,42 @@ def consul_request(self, instance, endpoint):
return resp.json()

### Consul Config Accessors
def _get_local_config(self, instance):
if not self._local_config or datetime.now() - self._last_config_fetch_time > timedelta(seconds=self.MAX_CONFIG_TTL):
self._local_config = self.consul_request(instance, '/v1/agent/self')
self._last_config_fetch_time = datetime.now()
def _get_local_config(self, instance, instance_state):
if not instance_state.local_config or datetime.now() - instance_state.last_config_fetch_time > timedelta(seconds=self.MAX_CONFIG_TTL):
instance_state.local_config = self.consul_request(instance, '/v1/agent/self')
instance_state.last_config_fetch_time = datetime.now()

return self._local_config
return instance_state.local_config

def _get_cluster_leader(self, instance):
return self.consul_request(instance, '/v1/status/leader')

def _get_agent_url(self, instance):
def _get_agent_url(self, instance, instance_state):
self.log.debug("Starting _get_agent_url")
local_config = self._get_local_config(instance)
local_config = self._get_local_config(instance, instance_state)
agent_addr = local_config.get('Config', {}).get('AdvertiseAddr')
agent_port = local_config.get('Config', {}).get('Ports', {}).get('Server')
agent_url = "{0}:{1}".format(agent_addr, agent_port)
self.log.debug("Agent url is %s" % agent_url)
return agent_url

def _get_agent_datacenter(self, instance):
local_config = self._get_local_config(instance)
def _get_agent_datacenter(self, instance, instance_state):
local_config = self._get_local_config(instance, instance_state)
agent_dc = local_config.get('Config', {}).get('Datacenter')
return agent_dc

### Consul Leader Checks
def _is_instance_leader(self, instance):
def _is_instance_leader(self, instance, instance_state):
try:
agent_url = self._get_agent_url(instance)
leader = self._last_known_leader or self._get_cluster_leader(instance)
agent_url = self._get_agent_url(instance, instance_state)
leader = instance_state.last_known_leader or self._get_cluster_leader(instance)
self.log.debug("Consul agent lives at %s . Consul Leader lives at %s" % (agent_url,leader))
return agent_url == leader

except Exception:
return False

def _check_for_leader_change(self, instance):
def _check_for_leader_change(self, instance, instance_state):
perform_new_leader_checks = instance.get('new_leader_checks',
self.init_config.get('new_leader_checks', False))
perform_self_leader_check = instance.get('self_leader_check',
Expand All @@ -148,20 +152,20 @@ def _check_for_leader_change(self, instance):
self.log.warn('Consul Leader information is not available!')
return

if not self._last_known_leader:
if not instance_state.last_known_leader:
# We have no state preserved, store some and return
self._last_known_leader = leader
instance_state.last_known_leader = leader
return

agent = self._get_agent_url(instance)
agent_dc = self._get_agent_datacenter(instance)
agent = self._get_agent_url(instance, instance_state)
agent_dc = self._get_agent_datacenter(instance, instance_state)

if leader != self._last_known_leader:
if leader != instance_state.last_known_leader:
# There was a leadership change
if perform_new_leader_checks or (perform_self_leader_check and agent == leader):
# We either emit all leadership changes or emit when we become the leader and that just happened
self.log.info(('Leader change from {0} to {1}. Sending new leader event').format(
self._last_known_leader, leader))
instance_state.last_known_leader, leader))

self.event({
"timestamp": int(datetime.now().strftime("%s")),
Expand All @@ -173,12 +177,12 @@ def _check_for_leader_change(self, instance):
leader,
agent_dc
),
"tags": ["prev_consul_leader:{0}".format(self._last_known_leader),
"tags": ["prev_consul_leader:{0}".format(instance_state.last_known_leader),
"curr_consul_leader:{0}".format(leader),
"consul_datacenter:{0}".format(agent_dc)]
})

self._last_known_leader = leader
instance_state.last_known_leader = leader

### Consul Catalog Accessors
def get_peers_in_cluster(self, instance):
Expand Down Expand Up @@ -208,16 +212,22 @@ def _cull_services_list(self, services, service_whitelist):
return services

def check(self, instance):
self._check_for_leader_change(instance)
# Instance state is mutable, any changes to it will be reflected in self._instance_states
instance_state = self._instance_states[hash_mutable(instance)]

self._check_for_leader_change(instance, instance_state)

peers = self.get_peers_in_cluster(instance)
main_tags = []
agent_dc = self._get_agent_datacenter(instance)
agent_dc = self._get_agent_datacenter(instance, instance_state)

if agent_dc is not None:
main_tags.append('consul_datacenter:{0}'.format(agent_dc))

if not self._is_instance_leader(instance):
for tag in instance.get('tags', []):
main_tags.append(tag)

if not self._is_instance_leader(instance, instance_state):
self.gauge("consul.peers", len(peers), tags=main_tags + ["mode:follower"])
self.log.debug("This consul agent is not the cluster leader." +
"Skipping service and catalog checks for this instance")
Expand Down
14 changes: 10 additions & 4 deletions conf.d/consul.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ instances:
# Whether to perform checks against the Consul service Catalog
catalog_checks: yes

# Whether to enable self leader checks. Each agent with this enabled will
# Whether to enable self leader checks. Each instance with this enabled will
# watch for itself to become the leader and will emit an event when that
# happens. It is safe/expected to enable this on all nodes in a consul
# cluster since only the new leader will emit the (single) event. This
# flag takes precedence over new_leader_checks.
self_leader_check: yes

# Whether to enable new leader checks from this agent
# Note: if this is set on multiple agents in the same cluster
# you will receive one event per leader change per agent. See
# Whether to enable new leader checks from this instance
# Note: if this is set on multiple instances/agents in the same cluster
# you will receive one event per leader change per instance. See
# self_leader_check for a more robust option.
new_leader_checks: yes

Expand All @@ -50,3 +50,9 @@ instances:
# - zookeeper
# - gunicorn
# - redis

# Additional tags to apply to the metrics, events and service checks
# You should always specify tags when multiple instances of the check run
# on the same agent.
# tags:
# - 'consul_server:server1'
23 changes: 12 additions & 11 deletions tests/checks/mock/test_consul.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random

from tests.checks.common import AgentCheckTest, load_check
from utils.containers import hash_mutable

MOCK_CONFIG = {
'init_config': {},
Expand Down Expand Up @@ -103,7 +104,7 @@ def mock_get_n_services_in_cluster(self, n):
dct[k] = []
return dct

def mock_get_local_config(self, instance):
def mock_get_local_config(self, instance, instance_state):
return {
"Config": {
"AdvertiseAddr": "10.0.2.15",
Expand Down Expand Up @@ -374,9 +375,6 @@ def _get_consul_mocks(self):
'_get_coord_nodes': self.mock_get_coord_nodes,
}

def test_bad_config(self):
self.assertRaises(Exception, self.run_check, MOCK_BAD_CONFIG)

def test_get_nodes_with_service(self):
self.run_check(MOCK_CONFIG, mocks=self._get_consul_mocks())
self.assertMetric('consul.catalog.nodes_up', value=1, tags=['consul_datacenter:dc1', 'consul_service_id:service-1'])
Expand Down Expand Up @@ -462,7 +460,8 @@ def test_cull_services_list(self):

def test_new_leader_event(self):
self.check = load_check(self.CHECK_NAME, MOCK_CONFIG_LEADER_CHECK, self.DEFAULT_AGENT_CONFIG)
self.check._last_known_leader = 'My Old Leader'
instance_hash = hash_mutable(MOCK_CONFIG_LEADER_CHECK['instances'][0])
self.check._instance_states[instance_hash].last_known_leader = 'My Old Leader'

mocks = self._get_consul_mocks()
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_B
Expand All @@ -477,7 +476,8 @@ def test_new_leader_event(self):

def test_self_leader_event(self):
self.check = load_check(self.CHECK_NAME, MOCK_CONFIG_SELF_LEADER_CHECK, self.DEFAULT_AGENT_CONFIG)
self.check._last_known_leader = 'My Old Leader'
instance_hash = hash_mutable(MOCK_CONFIG_SELF_LEADER_CHECK['instances'][0])
self.check._instance_states[instance_hash].last_known_leader = 'My Old Leader'

mocks = self._get_consul_mocks()

Expand All @@ -488,7 +488,7 @@ def test_self_leader_event(self):
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_A
self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.assertEqual(len(self.events), 1)
self.assertEqual(our_url, self.check._last_known_leader)
self.assertEqual(our_url, self.check._instance_states[instance_hash].last_known_leader)
event = self.events[0]
self.assertEqual(event['event_type'], 'consul.new_leader')
self.assertIn('prev_consul_leader:My Old Leader', event['tags'])
Expand All @@ -502,13 +502,13 @@ def test_self_leader_event(self):
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_B
self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.assertEqual(len(self.events), 0)
self.assertEqual(other_url, self.check._last_known_leader)
self.assertEqual(other_url, self.check._instance_states[instance_hash].last_known_leader)

# We regain the leadership
mocks['_get_cluster_leader'] = self.mock_get_cluster_leader_A
self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.assertEqual(len(self.events), 1)
self.assertEqual(our_url, self.check._last_known_leader)
self.assertEqual(our_url, self.check._instance_states[instance_hash].last_known_leader)
event = self.events[0]
self.assertEqual(event['event_type'], 'consul.new_leader')
self.assertIn('prev_consul_leader:%s' % other_url, event['tags'])
Expand All @@ -521,9 +521,10 @@ def test_network_latency_checks(self):
mocks = self._get_consul_mocks()

# We start out as the leader, and stay that way
self.check._last_known_leader = self.mock_get_cluster_leader_A(None)
instance_hash = hash_mutable(MOCK_CONFIG_NETWORK_LATENCY_CHECKS['instances'][0])
self.check._instance_states[instance_hash].last_known_leader = self.mock_get_cluster_leader_A(None)

self.run_check(MOCK_CONFIG_SELF_LEADER_CHECK, mocks=mocks)
self.run_check(MOCK_CONFIG_NETWORK_LATENCY_CHECKS, mocks=mocks)

latency = [m for m in self.metrics if m[0].startswith('consul.net.')]
latency.sort()
Expand Down