Skip to content

Commit

Permalink
Merge pull request #1752 from DataDog/etienne/es-shard-level-metrics
Browse files Browse the repository at this point in the history
Es shard level metrics
  • Loading branch information
elafarge committed Jul 14, 2015
2 parents 3e6c0b1 + 0c7bbbb commit b19f251
Show file tree
Hide file tree
Showing 3 changed files with 367 additions and 25 deletions.
164 changes: 158 additions & 6 deletions checks.d/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ class NodeNotFound(Exception):

ESInstanceConfig = namedtuple(
'ESInstanceConfig', [
'shard_level_metrics',
'pshard_stats',
'cluster_stats',
'password',
'service_check_tags',
Expand All @@ -31,9 +33,59 @@ class NodeNotFound(Exception):
class ESCheck(AgentCheck):
SERVICE_CHECK_CONNECT_NAME = 'elasticsearch.can_connect'
SERVICE_CHECK_CLUSTER_STATUS = 'elasticsearch.cluster_health'
SERVICE_CHECK_SHARD_STATE = 'elasticsearch.shard.state'

DEFAULT_TIMEOUT = 5

# Pre aggregated metrics concerning only primary shards
# Those dicts are populated in the ESCheck constructor
PRIMARY_SHARD_METRICS = {}
PRIMARY_SHARD_METRICS_POST_1_0 = {}

# Same kind of dicts for SHARD_LEVEL_METRICS
SHARD_LEVEL_METRICS = {}
SHARD_LEVEL_METRICS_POST_1_0 = {}

# Shard-specific metrics
SHARD_LEVEL_METRICS_SUFFIX = {
".docs.count": ("gauge", "docs.count"),
".docs.deleted": ("gauge", "docs.deleted"),
".store.size": ("gauge", "store.size_in_bytes"),
".indexing.index.total": ("gauge", "indexing.index_total"),
".indexing.index.time": ("gauge", "indexing.index_time_in_millis", lambda v: float(v)/1000),
".indexing.index.current": ("gauge", "indexing.index_current"),
".indexing.delete.total": ("gauge", "indexing.delete_total"),
".indexing.delete.time": ("gauge", "indexing.delete_time_in_millis", lambda v: float(v)/1000),
".indexing.delete.current": ("gauge", "indexing.delete_current"),
".get.total": ("gauge", "get.total"),
".get.time": ("gauge", "get.time_in_millis", lambda v: float(v)/1000),
".get.current": ("gauge", "get.current"),
".get.exists.total": ("gauge", "get.exists_total"),
".get.exists.time": ("gauge", "get.exists_time_in_millis", lambda v: float(v)/1000),
".get.missing.total": ("gauge", "get.missing_total"),
".get.missing.time": ("gauge", "get.missing_time_in_millis", lambda v: float(v)/1000),
".search.query.total": ("gauge", "search.query_total"),
".search.query.time": ("gauge", "search.query_time_in_millis", lambda v: float(v)/1000),
".search.query.current": ("gauge", "search.query_current"),
".search.fetch.total": ("gauge", "search.fetch_total"),
".search.fetch.time": ("gauge", "search.fetch_time_in_millis", lambda v: float(v)/1000),
".search.fetch.current": ("gauge", "search.fetch_current")
}

SHARD_LEVEL_METRICS_POST_1_0_SUFFIX = {
".merges.current": ("gauge", "merges.current"),
".merges.current.docs": ("gauge", "merges.current_docs"),
".merges.current.size": ("gauge", "merges.current_size_in_bytes"),
".merges.total": ("gauge", "merges.total"),
".merges.total.time": ("gauge", "merges.total_time_in_millis", lambda v: float(v)/1000),
".merges.total.docs": ("gauge", "merges.total_docs"),
".merges.total.size": ("gauge", "merges.total_size_in_bytes"),
".refresh.total": ("gauge", "refresh.total"),
".refresh.total.time": ("gauge", "refresh.total_time_in_millis", lambda v: float(v)/1000),
".flush.total": ("gauge", "flush.total"),
".flush.total.time": ("gauge", "flush.total_time_in_millis", lambda v: float(v)/1000)
}

STATS_METRICS = { # Metrics that are common to all Elasticsearch versions
"elasticsearch.docs.count": ("gauge", "indices.docs.count"),
"elasticsearch.docs.deleted": ("gauge", "indices.docs.deleted"),
Expand Down Expand Up @@ -170,6 +222,18 @@ class ESCheck(AgentCheck):
SOURCE_TYPE_NAME = 'elasticsearch'

def __init__(self, name, init_config, agentConfig, instances=None):

# Let's construct the PRIMARY_SHARD_METRICS and SHARD_LEVEL_METRICS dicts
for k, v in self.SHARD_LEVEL_METRICS_SUFFIX.iteritems():
val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None)
self.PRIMARY_SHARD_METRICS['elasticsearch.primaries{0}'.format(k)] = val
self.SHARD_LEVEL_METRICS['elasticsearch.shard{0}'.format(k)] = v

for k, v in self.SHARD_LEVEL_METRICS_POST_1_0_SUFFIX.iteritems():
val = (v[0], '_all.primaries.{0}'.format(v[1]), v[2] if len(v) > 2 else None)
self.PRIMARY_SHARD_METRICS_POST_1_0['elasticsearch.primaries{0}'.format(k)] = val
self.SHARD_LEVEL_METRICS_POST_1_0['elasticsearch.shard{0}'.format(k)] = v

AgentCheck.__init__(self, name, init_config, agentConfig, instances)

# Host status needs to persist across all checks
Expand All @@ -180,6 +244,9 @@ def get_instance_config(self, instance):
if url is None:
raise Exception("An url must be specified in the instance")

pshard_stats = _is_affirmative(instance.get('pshard_stats', False))
shard_level_metrics = _is_affirmative(instance.get('shard_level_metrics', False))

cluster_stats = _is_affirmative(instance.get('cluster_stats', False))
if 'is_external' in instance:
cluster_stats = _is_affirmative(instance.get('is_external', False))
Expand All @@ -204,6 +271,8 @@ def get_instance_config(self, instance):
timeout = instance.get('timeout') or self.DEFAULT_TIMEOUT

config = ESInstanceConfig(
shard_level_metrics=shard_level_metrics,
pshard_stats=pshard_stats,
cluster_stats=cluster_stats,
password=instance.get('password'),
service_check_tags=service_check_tags,
Expand All @@ -221,8 +290,21 @@ def check(self, instance):
# (URLs and metrics) accordingly
version = self._get_es_version(config)

health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics\
= self._define_params(version, config.cluster_stats)
health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, stats_metrics, \
pshard_stats_metrics, shard_level_metrics = self._define_params(version,
config.cluster_stats)

# Load primary shards data
if config.pshard_stats:
pshard_stats_url = urlparse.urljoin(config.url, pshard_stats_url)
pshard_stats_data = self._get_data(pshard_stats_url, config)
self._process_pshard_stats_data(pshard_stats_data, config, pshard_stats_metrics)

# Load shard-level metrics
if config.shard_level_metrics:
shard_level_url = urlparse.urljoin(config.url, pshard_stats_url) + '?level=shards'
shard_level_data = self._get_data(shard_level_url, config)
self._process_shard_level_data(shard_level_data, config, shard_level_metrics)

# Load stats data.
stats_url = urlparse.urljoin(config.url, stats_url)
Expand Down Expand Up @@ -268,6 +350,9 @@ def _define_params(self, version, cluster_stats):
""" Define the set of URLs and METRICS to use depending on the
running ES version.
"""

pshard_stats_url = "/_stats"

if version >= [0, 90, 10]:
# ES versions 0.90.10 and above
health_url = "/_cluster/health?pretty=true"
Expand Down Expand Up @@ -297,14 +382,21 @@ def _define_params(self, version, cluster_stats):

if version >= [0, 90, 5]:
# ES versions 0.90.5 and above
additional_metrics = self.ADDITIONAL_METRICS_POST_0_90_5
stats_metrics.update(self.ADDITIONAL_METRICS_POST_0_90_5)
else:
# ES version 0.90.4 and below
additional_metrics = self.ADDITIONAL_METRICS_PRE_0_90_5
stats_metrics.update(self.ADDITIONAL_METRICS_PRE_0_90_5)

stats_metrics.update(additional_metrics)
# Version specific stats metrics about the primary shards
pshard_stats_metrics = dict(self.PRIMARY_SHARD_METRICS)
shard_level_metrics = dict(self.SHARD_LEVEL_METRICS)

return health_url, nodes_url, stats_url, pending_tasks_url, stats_metrics
if version >= [1, 0, 0]:
pshard_stats_metrics.update(self.PRIMARY_SHARD_METRICS_POST_1_0)
shard_level_metrics.update(self.SHARD_LEVEL_METRICS_POST_1_0)

return health_url, nodes_url, stats_url, pshard_stats_url, pending_tasks_url, \
stats_metrics, pshard_stats_metrics, shard_level_metrics

def _get_data(self, url, config, send_sc=True):
""" Hit a given URL and return the parsed json
Expand Down Expand Up @@ -368,6 +460,66 @@ def _process_stats_data(self, nodes_url, data, stats_metrics, config):
node_data, metric, *desc, tags=config.tags,
hostname=metric_hostname)

def _process_pshard_stats_data(self, data, config, pshard_stats_metrics):
for metric, desc in pshard_stats_metrics.iteritems():
self._process_metric(data, metric, *desc, tags=config.tags)

def _process_shard_level_data(self, data, config, shard_level_metrics):
for index_name, index_data in data['indices'].iteritems():
for i_str, pshard_and_replicas in index_data['shards'].iteritems():
# Do we have more than one replica for this shard ?
count_replicas = len(pshard_and_replicas) > 2
replica_number = 0

for shard in pshard_and_replicas:
# Let's get the host tag
node = shard['routing']['node']

# Let's compute our shard name
shard_role = "replica"
if shard['routing']['primary']:
shard_name = 'P' + i_str
shard_role = "primary"
elif count_replicas:
replica_number += 1
shard_name = 'R{0}_{1}'.format(i_str, replica_number)
else:
shard_name = 'R' + i_str

# Let's add some interesting tags that will enable us to
# slice and dice as we wish in DatadogHQ
es_shard_tags = ["es_node:{0}".format(node),
"es_shard:{0}".format(shard_name),
"es_index:{0}".format(index_name),
"es_role:{0}".format(shard_role),
"shard_specific"]

tags = config.tags + es_shard_tags

# Let's send a good old service check
if shard['routing']['state'] == 'STARTED':
state = AgentCheck.OK
sc_msg = "Shard is running"
elif shard['routing']['state'] == 'INITIALIZING':
state = AgentCheck.WARNING
sc_msg = ("Shard is currently being initialized. It should turn green "
"(or red...) soon")
else:
state = AgentCheck.CRITICAL
sc_msg = "Shard is unassigned. Do you have enough nodes in your cluster ? "

self.service_check(
self.SERVICE_CHECK_SHARD_STATE,
state,
message=sc_msg,
tags=config.service_check_tags + es_shard_tags
)

# And let's finally send our metric !
for metric, desc in shard_level_metrics.iteritems():
self._process_metric(shard, metric, *desc, tags=config.tags +
es_shard_tags)

def _process_metric(self, data, metric, xtype, path, xform=None,
tags=None, hostname=None):
"""data: dictionary containing all the stats
Expand Down
27 changes: 26 additions & 1 deletion conf.d/elastic.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,35 @@ instances:
# This parameter was also called `is_external` and you can still use it but it
# will be removed in version 6.
#
# If you enable the "pshard_stats" flag, statistics over primary shards
# will be collected by the check and sent to the backend with the
# 'elasticsearch.primary' prefix. It is particularly useful if you want to
# get certain metrics without taking replicas into account. For instance,
# 'elasticsearch.primaries.docs.count` will give you the total number of
# documents in your indexes WITHOUT counting duplicates due to the existence
# of replica shards in your ES cluster
#
# The "shard_level_metrics" flag enables metrics and service checks on a per-
# shard basis (all the information is fetched under the /_stats?level=shards
# endpoint). The metrics and service check sent for each shard are named as
# such: elasticsearch.shard.metric.name .
# The shard name is computed according to elasticsearch's documentation. Each
# primary shard is named Pi (ex: P0, P1, P2...) and every replica shards will
# be named Ri (R0, R1, R2). In case the number of replicas is superior to 1,
# we stick to the following convention for shard names : Rx_y where x is the
# number of the associated primary shard while y is the replica number.
#
# Plase note that shard-level metrics will get the following extra tags:
# es_node:<node_name>, es_shard:<shard_name>, es_index:<index_name> and
# es_role:(primary|replica). They will also carry a "shard_specific" tag. It
# should enable you to slice and dice as you please in your DatadogHQ.
#
- url: http://localhost:9200
# username: username
# password: password
# is_external: false
# cluster_stats: false
# pshard_stats: false
# shard_level_metrics: true
# tags:
# - 'tag1:key1'
# - 'tag2:key2'
Loading

0 comments on commit b19f251

Please sign in to comment.